<a href="https://colab.research.google.com/github/jiachengma/NNProperty/blob/main/PINN_property_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install CoolProp

In [None]:
import torch
import torch.nn as nn
from torch.nn import MSELoss
import matplotlib.pyplot as plt
import numpy as np
from CoolProp.CoolProp import PropsSI
import CoolProp as CP
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import root_mean_squared_error, mean_absolute_percentage_error
from pickle import dump, load

In [None]:
# check device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
class PINNmodel(object):
    """
    Physics-informed neural networks for density, temperature, specific heat, and density partial derivatives w.r.t. (p, h)
    """
    def __init__(self, medium, p_max, p_min, h_max, h_min, net, normalizer_input=None, normalizer_output=None):
        self.medium = medium
        self.p_max = p_max
        self.p_min = p_min
        self.h_max = h_max
        self.h_min = h_min
        self.net = net.to(device)
        self.normalizer_input = normalizer_input
        self.normalizer_output = normalizer_output


    def generateSatData(self, n=100, phase='bubble'):
        p = np.random.uniform(self.p_min, self.p_max, n) # input pressure
        if phase == 'bubble':
          q = 0
        elif phase == 'dew':
          q = 1
        else:
          print('unknown phase')
        HEOS = CP.AbstractState("HEOS", self.medium)
        output = np.zeros((n, 4)) # d, h, T, cp
        deriv = np.zeros((n, 2)) # dddp, dhdp
        for k in range(n):
          HEOS.update(CP.PQ_INPUTS, p[k], q)
          dddp = HEOS.first_saturation_deriv(CP.iDmass, CP.iP)
          dhdp = HEOS.first_saturation_deriv(CP.iHmass, CP.iP)
          output[k] = np.array([HEOS.rhomass(), HEOS.hmass(), HEOS.T(), HEOS.cpmass()])
          deriv[k] = np.array([dddp, dhdp])
        return (p, output, deriv)

    def generateSinglePhaseData(self, n_p, n_h, phase='liquid'):
        """
        Generate single-phase training data
        n_p     -- number of pressure samples
        n_h     -- number of enthalpy samples per pressure
        phase   -- liquid or vapor
        """

        # p_range = np.random.uniform(self.p_min, self.p_max, n_p)
        # p_data = np.kron(p_range.reshape(-1,1), np.ones((1, n_h)))
        # if phase == 'liquid':
        #   h_f = PropsSI('Hmass', 'P', p_range, 'Q', 0, self.medium)
        #   h_data = np.tile(h_f-self.h_min, (n_h,1)).T * np.random.rand(n_p, n_h) + self.h_min
        # elif phase == 'vapor':
        #   h_g = PropsSI('Hmass', 'P', p_range, 'Q', 1, self.medium)
        #   h_data = self.h_max - np.tile(self.h_max - h_g, (n_h,1)).T * np.random.rand(n_p, n_h)
        # else:
        #   print('unknown phase')
        # input_ph = np.stack((p_data.flatten(), h_data.flatten()), axis=1)

        p_data = np.random.uniform(self.p_min, self.p_max, (n_p, n_h)).flatten()
        h_data = np.random.uniform(self.h_min, self.h_max, (n_p, n_h)).flatten()

        if phase == 'liquid':
          h_f = PropsSI('Hmass', 'P', p_data, 'Q', 0, self.medium)
          h_liq_idx = np.where(h_data<h_f)[0]
          input_ph = np.stack((p_data[h_liq_idx], h_data[h_liq_idx]), axis=1)
        elif phase == 'vapor':
          h_g = PropsSI('Hmass', 'P', p_data, 'Q', 1, self.medium)
          h_vap_idx = np.where(h_data>h_g)[0]
          input_ph = np.stack((p_data[h_vap_idx], h_data[h_vap_idx]), axis=1)
        else:
          print('unknown phase')
        print(input_ph.shape)
        HEOS = CP.AbstractState("HEOS", self.medium)
        output = np.zeros((input_ph.shape[0], 2)) # d, T
        deriv = np.zeros((input_ph.shape[0], 3)) # dddp_h, dddh_p, cp
        for k in range(output.shape[0]):
          HEOS.update(CP.HmassP_INPUTS, input_ph[k,1], input_ph[k,0])
          dddp = HEOS.first_partial_deriv(CP.iDmass, CP.iP, CP.iHmass)
          dddh = HEOS.first_partial_deriv(CP.iDmass, CP.iHmass, CP.iP)
          dTdh = HEOS.first_partial_deriv(CP.iT, CP.iHmass, CP.iP)
          output[k] = np.array([HEOS.rhomass(), HEOS.T()])
          deriv[k] = np.array([dddp, dddh, HEOS.cpmass()])
        return (input_ph, output, deriv)

    def grad(self, input, output):
      return torch.autograd.grad(output, input, grad_outputs=torch.ones_like(output).to(device), create_graph=True)[0]

    def loss_sp(self, criterion, input, output, deriv):
      """
      Single phase loss function. Note: predictions are normalized values
      """
      input.requires_grad = True
      y = self.net(input)
      drho = self.grad(input, y[:,0])
      dT = self.grad(input, y[:,1])
      loss_prop = criterion(y, output)
      # deriv_pred = torch.hstack((drho, dT[:,1:2]))
      # loss_deriv = criterion(deriv_pred, deriv)
      loss_deriv = 8*criterion(drho[:,0:1], deriv[:,0:1]) + criterion(drho[:,1:2], deriv[:,1:2]) + 5*criterion(dT[:,1:2], deriv[:,2:])
      return loss_prop, loss_deriv


    def loss_sat_p(self, criterion, input, output, deriv):
      """
      Saturated loss function for pressure input. Note: predictions are normalized values
      """
      input.requires_grad = True
      y = self.net(input)
      dddp = self.grad(input, y[:,0])
      dhdp = self.grad(input, y[:,1])
      deriv_pred = torch.hstack((dddp, dhdp))
      loss_prop = criterion(y, output)
      loss_deriv = criterion(deriv_pred, deriv)
      return loss_prop, loss_deriv


    def train_pinn(self, epochs, loss_f, train_input, train_output, train_deriv, alpha):
      """train single phase media model"""
      criterion = MSELoss()
      loss_record = []

      optimizer = torch.optim.LBFGS(self.net.parameters(),
                                    lr=5,
                                    max_iter=100,
                                    history_size=100,
                                    line_search_fn='strong_wolfe')
      def closure():
        optimizer.zero_grad()
        loss_prop, loss_deriv = loss_f(criterion, train_input, train_output, train_deriv)
        l = loss_prop + alpha*loss_deriv
        l.backward()
        return l

      self.net.train()
      for iter in range(epochs):
        l= optimizer.step(closure)
        loss_record.append(l.item())

        print(f'Epoch {iter+1} Loss: {l.item()}')
      fig, ax = plt.subplots()
      ax.plot(range(epochs), np.log(loss_record), label='training loss', linewidth=3)
      ax.x_label = 'epochs'
      ax.y_label = 'log(loss)'
      ax.legend()

    def normalize_sp(self, input, output, deriv):
      """ normalize single phase data"""
      # input_log = np.stack((np.log(input[:,0]),input[:,1]), axis=1)
      input_norm = self.normalizer_input.fit_transform(input)
      output_norm = self.normalizer_output.fit_transform(output)
      p_scale, h_scale = self.normalizer_input.scale_
      d_scale, T_scale = self.normalizer_output.scale_
      dddp_norm = deriv[:,0] * d_scale / p_scale
      dddh_norm = deriv[:,1] * d_scale / h_scale
      dTdh_norm = T_scale / h_scale / deriv[:,2]
      deriv_norm = np.stack((dddp_norm, dddh_norm, dTdh_norm), axis=1)

      input_norm = torch.from_numpy(input_norm).float().to(device)
      output_norm = torch.from_numpy(output_norm).float().to(device)
      deriv_norm = torch.from_numpy(deriv_norm).float().to(device)
      return (input_norm, output_norm, deriv_norm)

    def normalize_sat_p(self, input, output, deriv):
      """normalize saturated data with p input"""
      input_norm = self.normalizer_input.fit_transform(input[:,None])
      output_norm = self.normalizer_output.fit_transform(output)
      p_scale = self.normalizer_input.scale_
      d_scale, h_scale, T_scale, cp_scale = self.normalizer_output.scale_
      dddp_norm = deriv[:,0] * d_scale / p_scale
      dhdp_norm = deriv[:,1] * h_scale / p_scale
      deriv_norm = np.stack((dddp_norm, dhdp_norm), axis=1)

      input_norm = torch.from_numpy(input_norm).float().to(device)
      output_norm = torch.from_numpy(output_norm).float().to(device)
      deriv_norm = torch.from_numpy(deriv_norm).float().to(device)
      return (input_norm, output_norm, deriv_norm)

    def train_singlePhase(self, n_p=10, n_h=10, phase='liquid', epochs=10, alpha=1):
      if phase not in ['liquid', 'vapor']:
          raise Exception("phase input should be liquid or vapor")
      # generate data
      input, output, deriv = self.generateSinglePhaseData(n_p, n_h, phase)
      input_norm, output_norm, deriv_norm = self.normalize_sp(input, output, deriv)
      # training
      self.train_pinn(epochs, self.loss_sp, input_norm, output_norm, deriv_norm, alpha)

    def train_sat_p(self, n, phase='bubble', epochs=10, alpha=1):
      if phase not in ['bubble', 'dew']:
          raise Exception("phase input should be 'bubble' or 'dew'")
      # generate data
      input, output, deriv = self.generateSatData(n, phase)
      input_norm, output_norm, deriv_norm = self.normalize_sat_p(input, output, deriv)
      # training
      self.train_pinn(epochs, self.loss_sat_p, input_norm, output_norm, deriv_norm, alpha)

    def plot_validate(self, data_true, data_pred, titles=None):
      n = data_true.shape[1]
      fig, ax = plt.subplots(n, 1, figsize=(15, 20))
      for i in range(n):
        ax[i].plot(data_true[:,i], data_pred[:,i], '.', markersize=4)
        y_min = data_true[:,i].min()
        y_max = data_true[:,i].max()
        ax[i].plot(np.linspace(y_min, y_max), np.linspace(y_min, y_max))
        if titles:
          ax[i].set_title(titles[i])

    def validate_singlePhase(self, n_p=10, n_h=10, phase='liquid', criterion=None, plot=1):
      input, output, deriv = self.generateSinglePhaseData(n_p, n_h, phase)
      # input_log = np.stack((np.log(input[:,0]),input[:,1]), axis=1)
      input_norm = torch.from_numpy(self.normalizer_input.transform(input)).float().to(device)
      input_norm.requires_grad = True
      self.net.eval()
      output_norm = self.net(input_norm)
      p_scale, h_scale = self.normalizer_input.scale_
      d_scale, T_scale = self.normalizer_output.scale_
      dd_pred_norm = self.grad(input_norm, output_norm[:,0])
      dT_pred_norm = self.grad(input_norm, output_norm[:,1])
      output_pred = self.normalizer_output.inverse_transform(output_norm.detach().cpu().numpy())
      dddp_pred = dd_pred_norm[:,0].detach().cpu().numpy() / d_scale * p_scale
      dddh_pred = dd_pred_norm[:,1].detach().cpu().numpy() / d_scale * h_scale
      cp_pred = 1 / (dT_pred_norm[:,1].detach().cpu().numpy()) * T_scale / h_scale
      error_output = criterion(output, output_pred, multioutput='raw_values')
      deriv_pred = np.stack((dddp_pred, dddh_pred, cp_pred), axis=1)
      error_deriv = criterion(deriv, deriv_pred, multioutput='raw_values')
      print(f'MAPE for d, T: {error_output}, MAPE for dddp, dddh, cp: {error_deriv}')
      titles = ['d', 'T', 'dddp_h', 'dddh_p', 'cp']
      if plot:
        self.plot_validate(np.hstack((output, deriv)), np.hstack((output_pred, deriv_pred)), titles)
      return (error_output, error_deriv)

    def validate_sat(self, n, phase, criterion=None, plot=1):
      input, output, deriv = self.generateSatData(n, phase)
      input_norm = torch.from_numpy(self.normalizer_input.transform(input[:,None])).float().to(device)
      input_norm.requires_grad = True
      self.net.eval()
      output_norm = self.net(input_norm)
      p_scale = self.normalizer_input.scale_
      d_scale, h_scale, T_scale, cp_scale = self.normalizer_output.scale_
      dddp_pred_norm = self.grad(input_norm, output_norm[:,0])
      dhdp_pred_norm = self.grad(input_norm, output_norm[:,1])
      dddp_pred = dddp_pred_norm.detach().cpu().numpy() / d_scale * p_scale
      dhdp_pred = dhdp_pred_norm.detach().cpu().numpy() / h_scale * p_scale
      deriv_pred = np.hstack((dddp_pred, dhdp_pred))
      output_pred = self.normalizer_output.inverse_transform(output_norm.detach().cpu().numpy())
      error_output = criterion(output, output_pred, multioutput='raw_values')
      error_deriv = criterion(deriv, deriv_pred, multioutput='raw_values')
      print(f'MAPE for d, h, T, cp: {error_output}, MAPE for dddp, dhdp: {error_deriv}')
      titles = ['d', 'h', 'T', 'cp', 'dddp', 'dhdp']
      if plot:
        self.plot_validate(np.hstack((output, deriv)), np.hstack((output_pred, deriv_pred)), titles)
      return (error_output, error_deriv)

    def save_model(self, model_name):
      torch.save(self.net, model_name+'.pth') # save torch model
      # save normalize
      dump(self.normalizer_input, open('normalizer_input.pkl', 'wb'))
      dump(self.normalizer_output, open('normalizer_output.pkl', 'wb'))




In [None]:
medium = 'R410A'
p_crit = PropsSI('Pcrit', medium)
p_min = 1e5
p_max = p_crit * 0.95
h_min = 1.1e5
h_max = 4.9e5

N = 10
n_input = 2
n_output = 2
phase = 'liquid'
net = nn.Sequential(nn.Linear(n_input, N),
                    nn.Sigmoid(),
                    nn.Linear(N, N),
                    nn.Sigmoid(),
                    nn.Linear(N, n_output))
model = PINNmodel(medium, p_max, p_min, h_max, h_min, net, MinMaxScaler(), MinMaxScaler())
model.train_singlePhase(200, 200, phase, 100, 15)


# model.train_sat_p(20000, phase, 200, 5)
# _ = model.validate_sat(20000, phase, mean_absolute_percentage_error)


In [None]:
_ = model.validate_singlePhase(700, 700, phase, root_mean_squared_error)

In [None]:
# save model
model.save_model('R410a_liquid')

In [None]:
for name, param in model.net.named_parameters():
    print(f"Layer: {name} | Size: {param.size()} | Values: {param}")

### Load network weights

In [None]:
# file_path = 'R410a_liquid.pth'
# model = torch.load(file_path)
model_weights = model.net.state_dict()
weights = {k: v.detach().cpu().numpy() for k, v in model_weights.items()}

# Save weights to a text file
with open('weights.txt', 'w') as f:
    for key, value in weights.items():
        f.write(f"{key}: {value.tolist()}\n")

In [None]:
model.normalizer_input.scale_

In [None]:
model.normalizer_input.min_

In [None]:
model.normalizer_output.scale_

In [None]:
model.normalizer_output.min_

In [None]:
normalizer_input = load(open('normalizer_input.pkl', 'rb'))
normalizer_output = load(open('normalizer_output.pkl', 'rb'))

In [None]:
file_path = 'R410a_liquid.pth'
net = torch.load(file_path)

In [None]:
print(net)

In [None]:
model = PINNmodel(medium, p_max, p_min, h_max, h_min, net, normalizer_input, normalizer_output)

In [None]:
_ = model.validate_singlePhase(800, 800, phase, root_mean_squared_error)

In [None]:
normalizer_output.min_