In [3]:
import numpy as np
import scipy.io
from scipy.optimize import minimize, minimize_scalar, Bounds
import pickle
import matplotlib.pyplot as plt
from time import time
from sklearn.metrics import mean_squared_error, mean_absolute_error

class PQC:
    Z = np.matrix([[1, 0], [0, -1]])
    CNOT = np.matrix([[1, 0, 0, 0], [0, 1, 0, 0], [0, 0, 0, 1], [0, 0, 1, 0]])

    def __init__(self, n_qubits, n_layers, error, weights, params=None, seed=42):
        self.n_qubits = n_qubits
        self.n_layers = n_layers
        self.error = error
        self.weights = weights
        self.theta_params = self.n_qubits * 3 * self.n_layers

        self.Hamiltonian_operator = np.kron(np.eye(2 ** (self.n_qubits - 1), 2 ** (self.n_qubits - 1)), self.Z)
        self.L_w = []
        self.ws = []

        np.random.seed(seed)
        self.w0 = 1

        if params is None:
            self.params = np.random.uniform(0, 2 * np.pi, size=self.theta_params)
        else:
            self.params = params

        self.set_params(self.params)

    def load_grid(self, u_file='u_heat.mat', x_file='x_heat.mat', t_file='t_heat.mat'):
        self.u_file = u_file
        self.x_file = x_file
        self.t_file = t_file
        _u = scipy.io.loadmat(self.u_file)  # u loads as a dictionary
        _x = scipy.io.loadmat(self.x_file)  # u loads as a dictionary
        _t = scipy.io.loadmat(self.t_file)  # u loads as a dictionary
        self.u_real = np.transpose(_u['u'])  # x rows and t columns
        self.x_real = np.transpose(_x['x'])
        self.t_real = _t['t']
        self.x_points = self.x_real.size  # Number of space data points
        self.t_points = self.t_real.size  # Number of time data points
        
        Xl = np.full((self.t_points, 2), self.x_real[0][0])
        for i in range(self.t_points):
            Xl[i][0] = self.t_real[0][i]

        Xr = np.full((self.t_points, 2), self.x_real[-1][0])
        for i in range(self.t_points):
            Xr[i][0] = self.t_real[0][i]

        T0 = np.full((self.x_points - 2, 2), self.t_real[0][0])
        for i in range(self.x_points - 2):
            T0[i][1] = self.x_real[i + 1][0]

        self.BC_Points = np.concatenate([Xl, Xr, T0])
        Ul = self.u_real[0]
        Ur = self.u_real[-1]
        U0 = np.transpose(self.u_real)[0][1:-1]
        self.BC_Data = np.concatenate([Ul, Ur, U0])

        self.T = np.full((self.x_points, self.t_points), self.t_real)  # x_points x t_points
        self.X = np.full((self.x_points, self.t_points), self.x_real)

        self.Points = np.transpose((self.T.flatten(), self.X.flatten()))  # Cada element: [t x]
        Y = self.u_real.reshape((self.u_real.size, 1))

        # Add noise
        noise_level = 0

        # Gaussian noise
        # std = noise_level * np.std(Y)  # for %5 Gaussian noise
        # mu = 0
        # noise = np.random.normal(mu, std, size=Y.shape)
        # y_noisy = Y + noise

        # Normal distribution
        y_noisy = Y + noise_level * np.std(Y) * np.random.randn(Y[:, 0].size, 1)

        self.Y_noisy = np.transpose(y_noisy)

        self.N = self.Y_noisy.size  # Number of total data points
        self.M = self.BC_Data.size  # Number of total bc data points

        self.initials = self.initialize_grid(self.Points)
        self.BC_initials = self.initialize_grid(self.BC_Points)
    
    def save_params(self, filename):  # save params in a file
        with open(filename, 'wb') as outp:  # Overwrites any existing file.
            pickle.dump(self.params, outp)

    def load_params(self, filename):  # load params from a file
        with open(filename, 'rb') as inp:
            # self.params = pickle.load(inp)
            self.set_params(self, pickle.load(inp))
            
    def set_params(self, params):
        self.params = params
        self.set_U_theta()


    # Define the nonlinearity function
    def phi(self, x_i, j):  # Chebyshev
        return 2 * j * np.arccos(x_i)

    def dphi1(self, x):
        return -1 / np.sqrt(1 - x ** 2)

    def dphi2(self, x):
        return -x / ((1 - x ** 2) ** (3 / 2))

    def Rx(self, alpha):
        return np.matrix([[np.cos(alpha / 2), -1j * np.sin(alpha / 2)], [-1j * np.sin(alpha / 2), np.cos(alpha / 2)]])

    def Ry(self, alpha):
        return np.matrix([[np.cos(alpha / 2), -np.sin(alpha / 2)], [np.sin(alpha / 2), np.cos(alpha / 2)]])

    def Rz(self, alpha):
        return np.matrix([[np.exp(-1j * alpha / 2), 0], [0, np.exp(1j * alpha / 2)]])
    
    # Variational ansatz --> Hardware efficient ansatz (HEA)
    def set_U_theta(self):
        U_theta = np.eye(2 ** self.n_qubits)
        for d in range(self.n_layers):
            R1 = 1
            R2 = 1
            R3 = 1
            for j in range(self.n_qubits):
                R1 = np.kron(R1, self.Rz(self.params[j + 3 * self.n_qubits * d]))
                R2 = np.kron(R2, self.Rx(self.params[j + self.n_qubits * (1 + 3 * d)]))
                R3 = np.kron(R3, self.Rz(self.params[j + self.n_qubits * (2 + 3 * d)]))

            C1 = 1
            C2 = np.eye(2)
            if self.n_qubits % 2 == 0:
                for j in range(int(self.n_qubits / 2) - 1):
                    C1 = np.kron(C1, self.CNOT)
                    C2 = np.kron(C2, self.CNOT)
                C1 = np.kron(C1, self.CNOT)
                C2 = np.kron(C2, np.eye(2))
            else:
                for j in range(int((self.n_qubits - 1) / 2)):
                    C1 = np.kron(C1, self.CNOT)
                    C2 = np.kron(C2, self.CNOT)
                C1 = np.kron(C1, np.eye(2))

            M = C2 @ C1 @ R3 @ R2 @ R1
            U_theta = M @ U_theta
        self.U_theta = U_theta
        
    # Encoding layer that uses x_i as angles to encode data.
    def single_encoding(self, qubits, x, J, sign):
        aux = 1
        for i in range(qubits):
            deltas = 0
            for j in range(len(J)):
                deltas += (i + 1 == J[j]) * sign[j]
            shift = deltas % 4
            s = self.phi(x, i + 1) + shift * np.pi / 2
            aux = np.kron(aux, self.Ry(s))
        return aux
    
    # Matrix U_phi
    def U_phi(self, Xi, J, sign, dvar):
        ti, xi = Xi
        assert self.n_qubits % 2 == 0

        if dvar == 0:
            U1 = np.kron(self.single_encoding(self.n_qubits // 2, ti, [], []),
                        self.single_encoding(self.n_qubits // 2, xi, [], []))
        elif dvar == 't':
            U1 = np.kron(self.single_encoding(self.n_qubits // 2, ti, J, sign),
                        self.single_encoding(self.n_qubits // 2, xi, [], []))
        elif dvar == 'x':
            U1 = np.kron(self.single_encoding(self.n_qubits // 2, ti, [], []),
                        self.single_encoding(self.n_qubits // 2, xi, J, sign))

        # U1 = np.kron(self.single_encoding(self.n_qubits // len(Xi), Xi[0], [], []),
        #              self.single_encoding(self.n_qubits // len(Xi), Xi[1], [], []))
        # for i in range(len(Xi) - 2):
        #     U1 = np.kron(U1, self.single_encoding(self.n_qubits // len(Xi), Xi[i + 2], [], []))
        return U1

    # State |psi > after U_phi
    def psi_phi(self, Xi, J, sign, dvar):
        return self.U_phi(Xi, J, sign, dvar)[:, 0]

    def create_initials(self, Xi):
        Psi0 = self.psi_phi(Xi, [], [], 0)

        Psi1_1 = []
        Psi1_2 = []
        for j in range(self.n_qubits):
            J = [j + 1]
            Psi1_1.append([self.psi_phi(Xi, J, [1], 't'), self.psi_phi(Xi, J, [1], 'x')])
            Psi1_2.append([self.psi_phi(Xi, J, [-1], 't'), self.psi_phi(Xi, J, [-1], 'x')])

        Psi2_1 = []
        Psi2_2 = []
        Psi2_3 = []
        Psi2_4 = []
        for j2 in range(self.n_qubits):
            for j1 in range(self.n_qubits):
                J = [j1 + 1, j2 + 1]
                Psi2_1.append([self.psi_phi(Xi, J, [1, 1], 't'), self.psi_phi(Xi, J, [1, 1], 'x')])
                Psi2_2.append([self.psi_phi(Xi, J, [-1, 1], 't'), self.psi_phi(Xi, J, [-1, 1], 'x')])
                Psi2_3.append([self.psi_phi(Xi, J, [1, -1], 't'), self.psi_phi(Xi, J, [1, -1], 'x')])
                Psi2_4.append([self.psi_phi(Xi, J, [-1, -1], 't'), self.psi_phi(Xi, J, [-1, -1], 'x')])

        return Psi0, Psi1_1, Psi1_2, Psi2_1, Psi2_2, Psi2_3, Psi2_4

    def initialize_grid(self, grid):
        initials = [[]] * len(grid)
        for i, Xi in enumerate(grid):
            initials[i] = list(self.create_initials(Xi))
        return initials

    def expectation(self, operator, state):
        exp = state.T.conj() @ operator @ state
        return exp.real

    def output(self, in_state):
        out_state = self.U_theta @ in_state
        return self.expectation(self.Hamiltonian_operator, out_state)[0, 0]

    def ps1(self, elem, dvar):
        count1 = 0
        if dvar == 't':
            dvar = 0
        elif dvar == 'x':
            dvar = 1

        Psi0, Psi1_1, Psi1_2, Psi2_1, Psi2_2, Psi2_3, Psi2_4 = self.initials[elem]
        k = 0
        for j in range(self.n_qubits):
            out1 = self.output(Psi1_1[k][dvar])
            out2 = self.output(Psi1_2[k][dvar])
            count1 += (j + 1) * (out1 - out2)
            k += 1
        return count1

    def ps2(self, elem, dvar):
        count2 = 0
        if dvar == 't':
            dvar = 0
        elif dvar == 'x':
            dvar = 1

        Psi0, Psi1_1, Psi1_2, Psi2_1, Psi2_2, Psi2_3, Psi2_4 = self.initials[elem]
        k = 0
        for j2 in range(self.n_qubits):
            for j1 in range(self.n_qubits):
                out1 = self.output(Psi2_1[k][dvar])
                out2 = self.output(Psi2_2[k][dvar])
                out3 = self.output(Psi2_3[k][dvar])
                out4 = self.output(Psi2_4[k][dvar])
                count2 += (j1 + 1) * (j2 + 1) * (out1 - out2 - out3 + out4)
                k += 1
        return count2

    def ui(self, elem):
        Psi0, Psi1_1, Psi1_2, Psi2_1, Psi2_2, Psi2_3, Psi2_4 = self.initials[elem]
        return self.output(Psi0)

    def uBC(self, elem):
        BC_Psi0, BC_Psi1_1, BC_Psi1_2, BC_Psi2_1, BC_Psi2_2, BC_Psi2_3, BC_Psi2_4 = self.BC_initials[elem]
        return self.output(BC_Psi0)

    def u_i(self, elem, dvar):
        if dvar == 't':
            return 1/2 * self.ps1(elem, dvar) * self.dphi1(self.Points[elem][0])
        elif dvar == 'x':
            return 1/2 * self.ps1(elem, dvar) * self.dphi1(self.Points[elem][1])

    def u_ii(self, elem, dvar):
        if dvar == 't':
            return self.ps2(elem, dvar) * 1/4 * (self.dphi1(self.Points[elem][0]))**2 + 1/2 * self.ps1(elem, dvar) * self.dphi2(self.Points[elem][0])
        elif dvar == 'x':
            return self.ps2(elem, dvar) * 1/4 * (self.dphi1(self.Points[elem][1]))**2 + 1/2 * self.ps1(elem, dvar) * self.dphi2(self.Points[elem][1])

    def set_lambda(self, weights):
        if weights == 0:
            lambda_f = (1/self.N) / (1/self.N + 2/self.t_points + 1/(len(self.BC_Data) - 2*self.t_points))
            lambda_bl = (1/self.t_points) / (1/self.N + 2/self.t_points + 1/(len(self.BC_Data) - 2*self.t_points))
            lambda_br = (1/self.t_points) / (1/self.N + 2/self.t_points + 1/(len(self.BC_Data) - 2*self.t_points))
            lambda_b0 = (1/(len(self.BC_Data) - 2*self.t_points)) / (1/self.N + 2/self.t_points + 1/(len(self.BC_Data) - 2*self.t_points))
        elif weights == 1:
            lambda_f = self.N / (self.N + 2*self.t_points + (len(self.BC_Data) - 2*self.t_points))
            lambda_bl = self.t_points / (self.N + 2*self.t_points + (len(self.BC_Data) - 2*self.t_points))
            lambda_br = self.t_points / (self.N + 2*self.t_points + (len(self.BC_Data) - 2*self.t_points))
            lambda_b0 = (len(self.BC_Data) - 2*self.t_points) / (self.N + 2*self.t_points + (len(self.BC_Data) - 2*self.t_points))
        return lambda_f, lambda_bl, lambda_br, lambda_b0
    
    def norm_inf(self, x, y):
        return np.max(np.abs(x - y)) ** 2

    def __call__(self, i):
        u = self.ui(i)
        ut = self.u_i(i, 't')
        ux = self.u_i(i, 'x')
        uxx = self.u_ii(i, 'x')
        return u, ut, ux, uxx

    def predict(self):
        # print('predict')
        u = np.zeros(self.N)
        ut = np.zeros(self.N)
        ux = np.zeros(self.N)
        uxx = np.zeros(self.N)
        for i, Xi in enumerate(self.Points):
            u[i], ut[i], ux[i], uxx[i] = self(i)
        return u, ut, ux, uxx
    
    def predict_BC(self):
        # print('predict_BC')
        u_bc = np.zeros(self.M)
        for i, Xi in enumerate(self.BC_Points):
            u_bc[i] = self.uBC(i)
        return u_bc

    def cost_theta(self, u, ut, ux, uxx, u_bc):
        # print('cost_theta')
        lambda_f, lambda_bl, lambda_br, lambda_b0 = self.set_lambda(self.weights)
        if self.error == 'MSE':  # Mean Squared Error
            norm = mean_squared_error
        elif self.error == 'MAE':  # Mean Absolute Error
            norm = mean_absolute_error
        elif self.error == 'MAX':  # Max absolute
            norm = self.norm_inf
        
        Fw = self.w0 * uxx - ut
        
        function_loss = norm(Fw, np.zeros_like(Fw))
        L_diff = lambda_f * function_loss
        L_Xl = lambda_bl * norm(u_bc[0:self.t_points], self.BC_Data[0:self.t_points])
        L_Xr = lambda_br * norm(u_bc[self.t_points:2 * self.t_points], self.BC_Data[self.t_points:2 * self.t_points])
        L_T0 = lambda_b0 * norm(u_bc[2 * self.t_points:len(u_bc)], self.BC_Data[2 * self.t_points:len(self.BC_Data)])
        
        cost = L_diff + L_Xl + L_Xr + L_T0
        self.L_cost = np.array([cost, L_diff, L_Xl, L_Xr, L_T0])
        print(cost)
        return cost

    def cost_w(self, u):
        # print('cost_w')
        data_loss = mean_squared_error(u, self.Y_noisy[0])
        return data_loss

    def loss_theta(self, theta):
        # print('loss_theta')
        # print(theta)
        self.set_params(theta)
        u, ut, ux, uxx = self.predict()
        u_bc = self.predict_BC()
        return self.cost_theta(u, ut, ux, uxx, u_bc)

    def loss_w(self, w):
        # print('loss_w')
        self.w = w
        optimal_theta = self.fit_theta(inplace=True)
        self.set_params(optimal_theta)
        u, ut, ux, uxx = self.predict()
        self.L_w.append(self.cost_w(u))
        self.ws.append(self.w)
        
        return self.cost_w(u)

    def callback_theta(self, theta):
        print('Loss function:', self.loss_theta(theta))
        

    def fit_theta(self, inplace=True):
        old_parameters = np.copy(self.params)
        # "L-BFGS-B"
        optimal_theta = minimize(self.loss_theta, self.params, method="Nelder-Mead",
                                bounds=Bounds(lb=0, ub=2 * np.pi),
                                callback=self.callback_theta(self.params))
        
        if inplace:
            self.params = np.copy(optimal_theta.x)
        else:
            self.params = np.copy(old_parameters)
        
        return optimal_theta.x

    def fit_w(self):
        print('fit_w')
        optimal_w = minimize_scalar(self.loss_w)
        return optimal_w.x




In [4]:
n_qubits = 4 # number of qubits
n_layers = 5  # number of layers
error = 'MSE'  # error
weights = 1 # 0 or 1 ?

pqc = PQC(n_qubits, n_layers, error, weights)
pqc.load_grid()
print('start')
# pqc.save_params('params.pkl')
print(pqc.fit_w())

start
fit_w
3.033632752364917
3.0426372121097445
3.17850753318804
2.8638875196599174
4.267074996314873
3.1528010225464946
3.091649414906848
3.0632906594442892
2.4475225871864543
3.3585904611952437
3.58526805575027
3.035821050564468
4.270930798581478
3.491065671151896
2.9496637091742524
2.967267105582299
2.8859567609801564
2.9114401473718723
3.9553697042740374
3.067973026955872
3.0882634083886193
2.8287753220875143
3.074663488718382
3.0875912308195295
2.8374437248811795
2.8858825150864917
3.7012108095635505
2.8739851702286447
4.280432530921989
3.4077513322058968
2.9244128837723435
3.248675126869588
3.1725055947713763
3.014516491770298
3.6860505397663994
3.5010962803676358
4.017721705597718
2.937641098828649
3.037071155925807
2.5142275515293377
3.219214049267374
3.1096499234951818
3.0055301335207694
3.0376292409775427
4.475556400095724
3.0336327523649187
3.033632752364919
2.8628445821505104
3.2346119386609904
3.033632752364916
3.0336327523649183
2.629075027462237
3.5956617876488726
3.033

KeyboardInterrupt: 