In [None]:
import sys
sys.path.insert(0, '../Utilities/')

import torch
from collections import OrderedDict
import numpy as np
import matplotlib.pyplot as plt
import scipy.io
from scipy.interpolate import griddata
from mpl_toolkits.axes_grid1 import make_axes_locatable
import warnings

warnings.filterwarnings('ignore')

np.random.seed(1234)

In [None]:

torch.cuda.empty_cache()

In [None]:
# CUDA support 
if torch.cuda.is_available():
    device = torch.device('cuda')
    print("gpu")
else:
    device = torch.device('cpu')

In [None]:
# the deep neural network
class SineLayer(torch.nn.Module):
    def __init__(self, w0):
        super(SineLayer, self).__init__()
        self.w0 = w0    # this is the frequency of the sine wave

    def forward(self,x):
        return torch.sin(self.w0*x)
    
class DNN(torch.nn.Module):
    def __init__(self, layers):
        super(DNN, self).__init__()
        
        # parameters
        self.depth = len(layers) - 1
        
        # set up layer order dict
        # self.activation = SineLayer(3)
        
        layer_list = list()
        for i in range(self.depth - 1): 
            layer_list.append(
                ('layer_%d' % i, torch.nn.Linear(layers[i], layers[i+1]))
            )
            layer_list.append(('activation_%d' % i, SineLayer(3)))
            
        layer_list.append(
            ('layer_%d' % (self.depth - 1), torch.nn.Linear(layers[-2], layers[-1]))
        )
        layerDict = OrderedDict(layer_list)
        
        # deploy layers
        self.layers = torch.nn.Sequential(layerDict)
        
    def forward(self, x):
        out = self.layers(x)
        return out

In [None]:
# the physics-guided neural network
def calc_grad(y, x) -> torch.Tensor:
        grad = torch.autograd.grad(
            outputs=y,
            inputs=x,
            grad_outputs=torch.ones_like(y),
            create_graph=True,
            retain_graph=True,
        )[0]
        return grad

        
class PhysicsInformedNN():
    def __init__(self, X, u, Xt, X_i, X_f, layers):
    
        
        # data
        self.x = torch.tensor(X[:, 0], requires_grad=True).float().to(device)
        self.y = torch.tensor(X[:, 1], requires_grad=True).float().to(device)
        self.t = torch.tensor(X[:, 2], requires_grad=True).float().to(device)
        self.u = torch.tensor(u).float().to(device)


        self.x_lr = torch.tensor(X_f[:, 0], requires_grad=True).float().to(device)
        self.y_lr = torch.tensor(X_f[:, 1], requires_grad=True).float().to(device)
        self.t_lr = torch.tensor(X_f[:, 2], requires_grad=True).float().to(device)

        self.x_tb = torch.tensor(X_i[:, 0], requires_grad=True).float().to(device)
        self.y_tb = torch.tensor(X_i[:, 1], requires_grad=True).float().to(device)
        self.t_tb = torch.tensor(X_i[:, 2], requires_grad=True).float().to(device)
        
        self.x_t = torch.tensor(Xt[:, 0], requires_grad=True).float().to(device)
        self.y_t = torch.tensor(Xt[:, 1], requires_grad=True).float().to(device)
        self.t_t = torch.tensor(Xt[:, 2], requires_grad=True).float().to(device)

        self.dnn = DNN(layers).to(device)

        self.optimizer = torch.optim.LBFGS(
            self.dnn.parameters(), 
            lr=1.0, 
            max_iter=50000, 
            max_eval=50000, 
            history_size=50,
            tolerance_grad=1e-6, 
            tolerance_change=1.0 * np.finfo(float).eps,
            line_search_fn="strong_wolfe"    
        )
        
        self.optimizer_Adam = torch.optim.Adam(self.dnn.parameters())
        
    def net_u(self, x, y, t):  
        u = self.dnn(torch.stack([x, y, t], dim=1))
        return u
    
    def RH_l(self, p1, p2, u1, u2, v1, v2):
        if torch.norm(p1 - p2) > 0.2 and torch.norm((u1-u2)**2+(v1-v2)**2) > 0.04:
            return torch.norm((p1 - p2)*((u1-u2)**2+(v1-v2)**2))
        else:
            return 0
        
    
    def net_f_rh(self, x, y, t, dx, dy):
        

        gamma = 1.4
        hidden_output = self.net_u(x, y, t)
        hidden_output2 = self.net_u(x - dx, y, t)
        hidden_output3 = self.net_u(x, y - dy, t)
        rho_1 = hidden_output[:, 0]
        u_1 = hidden_output[:, 1]
        v_1 = hidden_output[:, 2]
        p_1 = hidden_output[:, 3]
        rho_2 = hidden_output2[:, 0]
        u_2 = hidden_output2[:, 1]
        v_2 = hidden_output2[:, 2]
        p_2 = hidden_output2[:, 3]
        rho_3 = hidden_output3[:, 0]
        u_3 = hidden_output3[:, 1]
        v_3 = hidden_output3[:, 2]
        p_3 = hidden_output3[:, 3]

        V2_1 = u_1 ** 2 + v_1 **2
        E_1 = (p_1 / (gamma - 1)) + (1/2 * rho_1 * V2_1)
        V2_2 = u_2 ** 2 + v_2 **2
        E_2 = (p_2 / (gamma - 1)) + (1/2 * rho_2 * V2_2)
        V2_3 = u_3 ** 2 + v_3 **2
        E_3 = (p_3 / (gamma - 1)) + (1/2 * rho_3 * V2_3)
        e_1 = E_1 - (1/2 * rho_1 * V2_1)
        e_2 = E_2 - (1/2 * rho_2 * V2_2)
        e_3 = E_3 - (1/2 * rho_3 * V2_3)
        
        f_1_x = rho_1*rho_2*((u_1 - u_2) ** 2 + (v_1 - v_2) ** 2) - (p_1 - p_2)*(rho_1 - rho_2)
        f_2_x = e_1*rho_2 + rho_1*e_2 - (1/2*(p_1 + p_2)*(rho_1 - rho_2))
        f_1_y = rho_1*rho_3*((u_1 - u_3) ** 2 + (v_1 - v_3) ** 2) - (p_1 - p_3)*(rho_1 - rho_3)
        f_2_y = e_1*rho_3 + rho_1*e_3 - (1/2*(p_1 + p_3)*(rho_1 - rho_3))
        l_x = self.RH_l(p_1, p_2, u_1, u_2, v_1, v_2)
        l_y = self.RH_l(p_1, p_3, u_1, u_3, v_1, v_3)
        del hidden_output, rho_1, v_1, u_1, p_1, e_1, E_1, V2_1
        del hidden_output2, rho_2, v_2, u_2, p_2, e_2, E_2, V2_2
        del hidden_output3, rho_3, v_3, u_3, p_3, e_3, E_3, V2_3
        return torch.mul(l_x, f_1_x), torch.mul(l_x, f_2_x), torch.mul(l_y, f_1_y), torch.mul(l_y, f_2_y)
    
    # def loss_con(self, x_en,x_in, t_en, t_in, crhoL,cuL,cpL,crhoR,cuR,cpR,tf):
    #     y_en = self.net_u(x_en, t_en)                                      
    #     y_in = self.net_u(x_in, t_in)                                     
    #     rhoen, pen,uen = y_en[:, -1], y_en[:, 2], y_en[:, 1]         
    #     rhoin, pin,uin = y_in[:, -1], y_in[:, 2], y_in[:, 1]         

    #     U2en = 0.5*rhoen*uen**2 + pen/0.4
    #     U2in = 0.5*rhoin*uin**2 + pin/0.4
    #     gamma = 0.4
    #     cU2L = 0.5*crhoL*cuL**2 + cpL/0.4 
    #     cU2R = 0.5*crhoR*cuR**2 + cpR/0.4 
    #     # Loss function for the initial condition
    #     loss_en = (torch.mean(rhoen - rhoin) - tf*(crhoL*cuL-crhoR*cuR))**2+ \
    #         (torch.mean(-U2en+ U2in) + tf*(cU2L*cuL - cU2R*cuR) + (cpL*cuL - cpR*cuR)*tf )**2 +\
    #         (torch.mean(-rhoen*uen + rhoin*uin)+(cpL-cpR)*tf +(crhoL*cuL*cuL-crhoR*cuR*cuR)*tf)**2
    #     return loss_en 
    
    def net_f_x(self, x, y, t):

        gamma = 1.4
        hidden_output = self.net_u(x, y, t)
        rho_pred = hidden_output[:, 0]
        u_pred = hidden_output[:, 1]
        v_pred = hidden_output[:, 2]
        p_pred = hidden_output[:, 2]

        rho_x = calc_grad(rho_pred, x)
        p_x = calc_grad(p_pred, x)
        u_x, v_x = calc_grad(u_pred, x), calc_grad(v_pred, x)
        del rho_pred, u_pred, v_pred, p_pred, hidden_output
        return rho_x, u_x, v_x, p_x
    
    def net_f_y(self, x, y, t):

        gamma = 1.4
        hidden_output = self.net_u(x, y, t)
        rho_pred = hidden_output[:, 0]
        u_pred = hidden_output[:, 1]
        v_pred = hidden_output[:, 2]
        p_pred = hidden_output[:, 2]

        rho_x = calc_grad(rho_pred, y)
        p_x = calc_grad(p_pred, y)
        u_x, v_x = calc_grad(u_pred, y), calc_grad(v_pred, y)
        del rho_pred, u_pred, v_pred, p_pred, hidden_output
        return rho_x, u_x, v_x, p_x
     
    def net_f(self, x, y, t):

        gamma = 1.4
        hidden_output = self.net_u(x, y, t)
        rho_pred = hidden_output[:, 0]
        u_pred = hidden_output[:, 1]
        v_pred = hidden_output[:, 2]
        p_pred = hidden_output[:, 3]

        V2_mag = u_pred ** 2 + v_pred ** 2
        E_pred = (p_pred / (gamma - 1)) + (1/2 * rho_pred * V2_mag)
        rho_t = calc_grad(rho_pred, t)
        p_x = calc_grad(p_pred, x)
        p_y = calc_grad(p_pred, y)
        # u_x, v_y = calc_grad(u_pred, x), calc_grad(v_pred, y)
        f_rho = ( 
            rho_t + calc_grad(rho_pred * u_pred, x) + calc_grad(rho_pred * v_pred, y)
            )
        f_u = (
            calc_grad(rho_pred * u_pred, t) + 
            calc_grad(rho_pred * torch.square(u_pred), x) + p_x
            + calc_grad(rho_pred * v_pred * u_pred, y)
        )
        f_v = (
            calc_grad(rho_pred * v_pred, t) + calc_grad(rho_pred * torch.square(v_pred), y) + p_y
            + calc_grad(rho_pred * v_pred * u_pred, x)
        )
        f_e = (
            calc_grad(E_pred, t) +  calc_grad(u_pred * (E_pred + p_pred), x)
            + calc_grad(v_pred * (E_pred + p_pred), y)
        )
        del rho_pred, u_pred, v_pred, p_pred, E_pred, hidden_output, V2_mag, rho_t, p_x, p_y
        # l = (1/ ( 0.2*(torch.norm(u_x + v_y) - (u_x + v_y) ) + 1))
        return f_rho, f_u, f_v, f_e
    
    def loss_func(self):
        u_pred = self.net_u(self.x, self.y, self.t)
        # rho_y_tb, u_y_tb, v_y_tb, p_y_tb = self.net_f_y(self.x_tb, self.y_tb, self.t_tb)
        # rho_x_lr, u_x_lr, v_x_lr, p_x_lr = self.net_f_x(self.x_lr, self.y_lr, self.t_lr)
        dx = 1/100
        dy = 1/100
        # loss_con = self.loss_con(self.x_en,self.x_in, self.y_en, self.y_in, 1,0,1,0.125,0,0.1,0.2)
        f_rho_pred, f_u_pred, f_v_pred, f_e_pred = self.net_f(self.x_t, self.y_t, self.t_t)
        f_1_x, f_2_x, f_1_y, f_2_y = self.net_f_rh(self.x_t, self.y_t, self.t_t, dx, dy)
        loss = 10*(torch.mean((self.u[:, 0] - u_pred[:, 0]) ** 2) + torch.mean((self.u[:, 1] - u_pred[:, 1]) ** 2) + \
                torch.mean((self.u[:, 2] - u_pred[:, 2]) ** 2) + torch.mean((self.u[:, 3] - u_pred[:, 3]) ** 2) + \
                # torch.mean((rho_y_tb + u_y_tb + v_y_tb + p_y_tb) ** 2) + \
                # torch.mean((rho_x_lr + u_x_lr + v_x_lr + p_x_lr) ** 2) + \
                torch.mean(f_1_x ** 2) + torch.mean(f_2_x ** 2) + \
                torch.mean(f_1_y ** 2) + torch.mean(f_2_y ** 2)) + \
            (torch.mean(f_rho_pred ** 2) + torch.mean(f_u_pred ** 2) + \
                torch.mean(f_v_pred ** 2) + torch.mean(f_e_pred ** 2)) 
    
        self.optimizer.zero_grad()
        loss.backward()

        print('Loss: %e' % (loss.item()))
        del u_pred, f_rho_pred, f_u_pred, f_v_pred, f_e_pred, f_1_x, f_2_x, f_1_y, f_2_y
        return loss
    
    def train(self, nIter):
        self.dnn.train()
        # for epoch in range(nIter):
        #     u_pred = self.net_u(self.x, self.y, self.t)
        #     rho_y_tb, u_y_tb, v_y_tb, p_y_tb = self.net_f_y(self.x_tb, self.y_tb, self.t_tb)
        #     rho_x_lr, u_x_lr, v_x_lr, p_x_lr = self.net_f_x(self.x_lr, self.y_lr, self.t_lr)
        #     dx = 1/100
        #     dy = 1/100
        #     # loss_con = self.loss_con(self.x_en,self.x_in, self.y_en, self.y_in, 1,0,1,0.125,0,0.1,0.2)
        #     f_rho_pred, f_u_pred, f_v_pred, f_e_pred, l = self.net_f(self.x_t, self.y_t, self.t_t)
        #     f_1_x, f_2_x, f_1_y, f_2_y = self.net_f_rh(self.x_t, self.y_t, self.t_t, dx, dy)
        #     loss = 10*(torch.mean((self.u[:, 0] - u_pred[:, 0]) ** 2) + torch.mean((self.u[:, 1] - u_pred[:, 1]) ** 2) + \
        #             torch.mean((self.u[:, 2] - u_pred[:, 2]) ** 2) + torch.mean((self.u[:, 3] - u_pred[:, 3]) ** 2) + \
        #             torch.mean((rho_y_tb + u_y_tb + v_y_tb + p_y_tb) ** 2) + \
        #             torch.mean((rho_x_lr + u_x_lr + v_x_lr + p_x_lr) ** 2) + \
        #             torch.mean(f_1_x ** 2) + torch.mean(f_2_x ** 2) + \
        #             torch.mean(f_1_y ** 2) + torch.mean(f_2_y ** 2)) + \
        #         (torch.mean(torch.mul(l, f_rho_pred) ** 2) + torch.mean(torch.mul(l, f_u_pred) ** 2) + \
        #          torch.mean(torch.mul(l, f_v_pred) ** 2) + torch.mean(torch.mul(l, f_e_pred) ** 2)) 
            
        #     # Backward and optimize
        #     self.optimizer_Adam.zero_grad()
        #     loss.backward()
        #     self.optimizer_Adam.step()
            
        #     if epoch % 100 == 0:
        #         print(
        #             'It: %d, Loss: %.3e' % 
        #             (
        #                 epoch, 
        #                 loss.item(), 
        #             )
        #         )
        #     del u_pred, rho_y_tb, u_y_tb, v_y_tb, p_y_tb, rho_x_lr, u_x_lr, v_x_lr, p_x_lr, f_rho_pred, f_u_pred, f_v_pred, f_e_pred, f_1_x, f_2_x, f_1_y, f_2_y, l
        self.optimizer.step(self.loss_func)
                
    
    def predict(self, X):
        x = torch.tensor(X[:, 0], requires_grad=True).float().to(device)
        y = torch.tensor(X[:, 1], requires_grad=True).float().to(device)
        t = torch.tensor(X[:, 2], requires_grad=True).float().to(device)
        self.dnn.eval()
        psi_and_p = self.net_u(x, y, t)
        rho_pred = psi_and_p[:, 0]
        u_pred = psi_and_p[:, 1]
        v_pred = psi_and_p[:, 2]
        p_pred = psi_and_p[:, 3]
        return rho_pred, u_pred, v_pred, p_pred

## Configurations

In [None]:
layers = [3, 50, 50, 50, 50, 50, 50, 50, 4]

n = 101
T = 21
x = np.linspace(0, 1, n)
y = np.linspace(0, 1, n)

MG1 = np.meshgrid(x, y)
xf = MG1[0].flatten()
yf = MG1[1].flatten()

X_test = np.vstack([xf, yf]).T      
print(X_test.shape)

In [None]:
N = n*n


xx = np.array(x).reshape(1, n)
a = np.tile(xx, n).T

yy = np.array(y).reshape(1, n)
s = np.tile(yy.T, n).flatten()[None, :].T
t = np.linspace(0, 0.4, T)
f = t.reshape(1, T)
t = t.reshape(-1)

# Rearrange Data
XX = np.tile(a, (1, T))  # N x T
YY = np.tile(s, (1, T))  # N x T
TT = np.tile(f, (N, 1))  # N x T

xx = XX.flatten()[:, None]  # NT x 1
yy = YY.flatten()[:, None]  # NT x 1
tt = TT.flatten()[:, None]  # NT x 1

X_test = np.hstack((xx, yy, tt))
print(X_test.shape)

In [None]:
import math
u_it = []
x_i = []
wall = []
x_lr = []
x_tb = []
x_ft = []
x_it = []
for i in range(len(X_test)):
    if X_test[i][0] == 0 or X_test[i][0] == 1:
          x_lr.append(X_test[i])
    if X_test[i][1] == 0 or X_test[i][1] == 1:
          x_tb.append(X_test[i])
    if X_test[i][2] == 0.4:
          x_ft.append(X_test[i])
    if X_test[i][2] == 0:
          x_it.append(X_test[i])

# for i in range(len(x_it)):
#     if x_it[i][0] > 0.8 and x_it[i][1] > 0.8:
#         u_it.append([1.5, 0, 0, 1.5])
#         x_i.append(x_it[i])
#     elif x_it[i][0] <= 0.8 and x_it[i][1] > 0.8:
#         u_it.append([33.0 / 62.0, 4.0 / math.sqrt(11.0), 0, 0.3])
#         x_i.append(x_it[i])
#     elif x_it[i][0] <= 0.8 and x_it[i][1] <= 0.8:
#         u_it.append([77.0 / 558.0, 4.0 / math.sqrt(11.0), 4.0 / math.sqrt(11.0), 9.0 / 310.0])
#         x_i.append(x_it[i])
#     elif x_it[i][0] > 0.8 and x_it[i][1] <= 0.8:
#         u_it.append([33.0 / 62.0, 0, 4.0 / math.sqrt(11.0), 0.3])
#         x_i.append(x_it[i])
          
for i in range(len(x_it)):
    if x_it[i][0] > 0.5 and x_it[i][1] > 0.5:
        u_it.append([1, 0.75, -0.5, 1])
        x_i.append(x_it[i])
    elif x_it[i][0] <= 0.5 and x_it[i][1] > 0.5:
        u_it.append([2, 0.75, 0.5, 1])
        x_i.append(x_it[i])
    elif x_it[i][0] <= 0.5 and x_it[i][1] <= 0.5:
        u_it.append([1, -0.75, 0.5, 1])
        x_i.append(x_it[i])
    elif x_it[i][0] > 0.5 and x_it[i][1] <= 0.5:
        u_it.append([3, -0.75, -0.5, 1])
        x_i.append(x_it[i])

x_i = np.array(x_i)
u_i = np.array(u_it)
x_tb = np.array(x_tb)
x_lr = np.array(x_lr)
x_ft = np.array(x_ft)
x_it = np.array(x_it)

X_i = np.vstack([x_i[:, 0], x_i[:, 1], x_i[:, 2]]).T
X_tb = np.vstack([x_tb[:, 0], x_tb[:, 1], x_tb[:, 2]]).T
X_lr = np.vstack([x_lr[:, 0], x_lr[:, 1], x_lr[:, 2]]).T
U_i = np.vstack((u_i[:, 0], u_i[:, 1], u_i[:, 2], u_i[:, 3])).T 
X_it = np.vstack((x_it[:, 0], x_it[:, 1], x_it[:, 2])).T 
X_ft = np.vstack((x_ft[:, 0], x_ft[:, 1], x_it[:, 2])).T 

# fig = plt.figure(figsize = (10, 7))
# ax = plt.axes(projection ="3d")
# ax.scatter3D(X_u_train[:, 0], X_u_train[:, 1], u_train[:, 0], color = "green")
# plt.title("simple 3D scatter plot")

plt.scatter(X_it[:, 0], X_it[:, 1], c ="blue", s = 1)
plt.show()


In [None]:
model = PhysicsInformedNN(X_i, U_i, X_test, X_tb, X_lr, layers)
model.train(0)

In [None]:
rho_pred, u_pred, v_pred, p_pred = model.predict(X_test)
preds =  torch.stack([rho_pred, u_pred, v_pred, p_pred], dim=1)
preds = preds.detach().cpu().numpy()
rho_pred = preds[:, 0]
u_pred = preds[:, 1]
v_pred = preds[:, 2]
p_pred = preds[:, 3]

In [None]:
n = 101
fig = plt.figure(figsize=(20, 5))
con_lv = 120
ax1 = fig.add_subplot(1, 1, 1)
_u = u_pred.reshape(n, n, 21)
contour1 = ax1.contourf(_u[:, :, 20], con_lv, origin='lower', cmap='rainbow', aspect='auto')
ax1.set_aspect('equal')
ax1.set_xlabel('x')
ax1.set_ylabel('y')
ax1.set_title('[u_pred]')

plt.tight_layout()
plt.show()


In [None]:
fig = plt.figure(figsize=(20, 5))
ax1 = fig.add_subplot(1, 1, 1)
_p = p_pred.reshape(n ,n, 40)
contour1 = ax1.contourf(_p[:, :, 19], con_lv, origin='lower', cmap='rainbow', aspect='auto')
ax1.set_aspect('equal')
ax1.set_xlabel('x')
ax1.set_ylabel('y')
ax1.set_title('[p_pred]')

plt.tight_layout()
plt.show()


In [None]:
fig = plt.figure(figsize=(20, 5))
ax1 = fig.add_subplot(1, 1, 1)
_V = rho_pred.reshape(n, n)
contour1 = ax1.contourf(_V, con_lv, origin='lower', cmap='rainbow', aspect='auto')
ax1.set_aspect('equal')
ax1.set_xlabel('x')
ax1.set_ylabel('y')
ax1.set_title('[v_mag_p]')

plt.tight_layout()
plt.show()
