In [1]:
import torch
import torch.autograd as autograd         # computation graph
from torch import Tensor                  # tensor node in the computation graph
import torch.nn as nn                     # neural networks
import torch.optim as optim               # optimizers e.g. gradient descent, ADAM, etc.

import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from mpl_toolkits.axes_grid1 import make_axes_locatable
from mpl_toolkits.mplot3d import Axes3D
import matplotlib.ticker
from torch.nn.parameter import Parameter
import matplotlib as mpl

import numpy as np
import time
#from pyDOE import lhs         #Latin Hypercube Sampling
import scipy.io
from scipy.io import savemat

from smt.sampling_methods import LHS

#Set default dtype to float32
torch.set_default_dtype(torch.float)

#PyTorch random number generator
torch.manual_seed(1234)

# Random number generators in other libraries
np.random.seed(1234)

# Device configuration
device = torch.device('cuda:1' if torch.cuda.is_available() else 'cpu')

print(device)

if device == 'cuda': 
    print(torch.cuda.get_device_name())
    

cuda:1


In [2]:
# from google.colab import drive
# drive.mount('/content/gdrive')

In [3]:
# %cd '/content/gdrive/MyDrive/Virginia Tech /Fall 2022/Codes from GPU/PINN_Stan/2D Klein Gordon/stan'

In [4]:
# !pip install smt

In [5]:
def true_2D_1(xt): #True function for 2D_1 Klein Gordon Equation x \in [-50,50] , t \in [0,10]
    y = xt[:,0]*np.cos(xt[:,1])
    return y.reshape(-1,1)    

In [6]:
label = "KG_swish"
loss_thresh = 0.01

x = np.linspace(-5,5,500).reshape(-1,1)
t = np.linspace(0,10,1000).reshape(-1,1)

X,T = np.meshgrid(x,t)

X = X.flatten('F').reshape(-1,1)
T = T.flatten('F').reshape(-1,1)
  
xt = np.hstack((X,T))

y_true = true_2D_1(xt)
y_true_norm = np.linalg.norm(y_true,2)

#bound_pts_idx = ((X == -5) + (X == 5) + (T == 0)).reshape(-1,)

#xt_bound = xt[bound_pts_idx,:]
#y_bound = y_true[bound_pts_idx,:]


xt_test_tensor = torch.from_numpy(xt).float().to(device)


lb_xt = xt[0]
ub_xt = xt[-1]

In [7]:
def trainingdata(N_I,N_B,N_f,seed):
    '''Boundary Conditions''' 
    
    x01 = np.array([[0.0,1.0],[0.0,0.0]])
    sampling = LHS(xlimits=x01,random_state=seed)
    samples = sampling(N_I)
    xt_BC1 = lb_xt + (ub_xt - lb_xt)*samples
    y_BC1 = true_2D_1(xt_BC1)
    

    x01 = np.array([[0.0,0.0],[0.0,1.0]])
    sampling = LHS(xlimits=x01,random_state=seed+1)
    samples = sampling(int(N_B/2))
    xt_coll = lb_xt + (ub_xt - lb_xt)*samples
    xt_BC2 = lb_xt + (ub_xt - lb_xt)*samples
    y_BC2 = true_2D_1(xt_BC2)
    
    x01 = np.array([[1.0,1.0],[0.0,1.0]])
    sampling = LHS(xlimits=x01,random_state=seed+2)
    samples = sampling(int(N_B/2))
    xt_coll = lb_xt + (ub_xt - lb_xt)*samples
    xt_BC3 = lb_xt + (ub_xt - lb_xt)*samples
    y_BC3 = true_2D_1(xt_BC3)

    xt_BC = np.vstack((xt_BC1,xt_BC2,xt_BC3))
    y_BC = np.vstack((y_BC1,y_BC2,y_BC3))

    '''Collocation Points'''

    # Latin Hypercube sampling for collocation points 
    # N_f sets of tuples(x,t)
    x01 = np.array([[0.0,1.0],[0.0,1.0]])
    sampling = LHS(xlimits=x01,random_state =seed)
    samples = sampling(N_f)
    xt_coll = lb_xt + (ub_xt - lb_xt)*samples
    
    xt_coll = np.vstack((xt_coll, xt_BC)) # append training points to collocation points 

    return xt_coll, xt_BC, y_BC

In [8]:
class Sequentialmodel(nn.Module):
    
    def __init__(self,layers):
        super().__init__() #call __init__ from parent class 
              
        'activation function'
        self.activation = nn.Sigmoid()

        'loss function'
        self.loss_function = nn.MSELoss(reduction ='mean')
        
        'Initialise neural network as a list using nn.Modulelist'  
        self.linears = nn.ModuleList([nn.Linear(layers[i], layers[i+1]) for i in range(len(layers)-1)])
        
        # std = gain * sqrt(2/(input_dim+output_dim))
        for i in range(len(layers)-1):
            nn.init.xavier_normal_(self.linears[i].weight.data, gain=1.0)
            # set biases to zero
            nn.init.zeros_(self.linears[i].bias.data)
            
        
        self.beta = Parameter(torch.ones((50,len(layers)-2)))
        self.beta.requiresGrad = True

            
    'foward pass'
    def forward(self,xt):
        if torch.is_tensor(xt) != True:         
            xt = torch.from_numpy(xt)                
        
        ubxt = torch.from_numpy(ub_xt).float().to(device)
        lbxt = torch.from_numpy(lb_xt).float().to(device)
    
                      
        #preprocessing input 
        xt = (xt - lbxt)/(ubxt - lbxt)
        
        #convert to float
        a = xt.float()
        
        for i in range(len(layers)-2):
            z = self.linears[i](a)
            a = z*self.activation(self.beta[:,i]*z)
            
        a = self.linears[-1](a) 
         
        return a
                        
    def loss_BC(self,xt,y):
                
        loss_bc = self.loss_function(self.forward(xt), y)
                
        return loss_bc
    
    def loss_PDE(self, xt_coll, f_hat):
        
        g = xt_coll.clone()             
        g.requires_grad = True
        y = self.forward(g) 
        
        
        y_x_t = autograd.grad(y,g,torch.ones([xt_coll.shape[0], 1]).to(device), retain_graph=True, create_graph=True,allow_unused = True)[0]
        
        y_xx_tt = autograd.grad(y_x_t,g,torch.ones(xt_coll.shape).to(device), create_graph=True,allow_unused = True)[0]

        #du_dx = u_x_t[:,[0]]
        
        d2y_dx2 = y_xx_tt[:,[0]]
        d2y_dt2 = y_xx_tt[:,[1]]    
        

        f = d2y_dt2 - d2y_dx2 + torch.pow(y,2) + (g[:,0]*torch.cos(g[:,1])).reshape(-1,1) - (torch.pow(g[:,0],2)*torch.pow(torch.cos(g[:,1]),2)).reshape(-1,1)
        
        loss_f = self.loss_function(f,f_hat)
                
        return loss_f
    
    def loss(self,xt_BC,y_BC,xt_coll,f_hat):

        loss_BC = self.loss_BC(xt_BC,y_BC)
        loss_f = self.loss_PDE(xt_coll,f_hat)
        
        loss_val = loss_BC + loss_f
        
        return loss_val
         
    'test neural network'
    def test(self):
        y_pred = self.forward(xt_test_tensor)
        y_pred = y_pred.cpu().detach().numpy()
   
        return y_pred
    
    def test_loss(self):
        y_pred = self.test()
        
        test_mse = np.mean(np.square(y_pred.reshape(-1,1) - y_true.reshape(-1,1)))
        test_re = np.linalg.norm(y_pred.reshape(-1,1) - y_true.reshape(-1,1),2)/y_true_norm
        
        return test_mse, test_re  

In [9]:
def data_update(loss_np):
    train_loss.append(loss_np)
    beta_val.append(PINN.beta.cpu().detach().numpy())
    
    test_mse, test_re = PINN.test_loss()
    test_mse_loss.append(test_mse)
    test_re_loss.append(test_re)

In [10]:
def train_step(xt_BC, y_BC, xt_coll, f_hat,seed):
    # x_coll_np_array = colloc_pts(N_f,seed*123)
    # x_coll_train = torch.from_numpy(x_coll_np_array).float().to(device)        
    
    # f_hat = torch.zeros(x_coll_train.shape[0],1).to(device)
    
    def closure():
        optimizer.zero_grad()
        loss = PINN.loss(xt_BC, y_BC, xt_coll,f_hat)
        loss.backward()
        #print(loss.cpu().detach().numpy())
        return loss

    optimizer.step(closure)

In [11]:
def train_model(max_iter,rep): 
  print(rep) 
  torch.manual_seed(rep*11)
  start_time = time.time() 
  thresh_flag = 0

  xt_coll, xt_BC, y_BC = trainingdata(N_I,N_B,N_f,rep*11)
  xt_coll = torch.from_numpy(xt_coll).float().to(device)
  xt_BC = torch.from_numpy(xt_BC).float().to(device)
  y_BC = torch.from_numpy(y_BC).float().to(device)

  f_hat = torch.zeros(xt_coll.shape[0],1).to(device)

  for i in range(max_iter):
    train_step(xt_BC, y_BC, xt_coll,f_hat,i)
    
    loss_np = PINN.loss(xt_BC, y_BC, xt_coll,f_hat).cpu().detach().numpy()
    if(thresh_flag == 0):
        if(loss_np < loss_thresh):
            time_threshold[rep] = time.time() - start_time
            epoch_threshold[rep] = i+1            
            thresh_flag = 1       
    data_update(loss_np)
    
    print(i,"Train Loss",train_loss[-1],"Test MSE",test_mse_loss[-1],"Test RE",test_re_loss[-1])
    

    

  elapsed_time[rep] = time.time() - start_time  
  print('Training time: %.2f' % (elapsed_time[rep]))

In [12]:

#for tune_reps in range(4,5):
max_reps = 10 #10
max_iter = 200 #100


train_loss_full = []
test_mse_full = []
test_re_full = []
beta_full = []
elapsed_time= np.zeros((max_reps,1))
time_threshold = np.empty((max_reps,1))
time_threshold[:] = np.nan
epoch_threshold = max_iter*np.ones((max_reps,1))

N_I = 200  #Total number of data points for 'y'
N_B = 400
N_f = 10000 #Total number of collocation points

for reps in range(max_reps):
  print(reps)

  train_loss = []
  test_mse_loss = []
  test_re_loss = []
  beta_val = []

  torch.manual_seed(reps*36)

  layers = np.array([2,50,50,50,50,50,50,50,50,50,1]) #9 hidden layers

  PINN = Sequentialmodel(layers)

  PINN.to(device)

  'Neural Network Summary'
  print(PINN)

  params = list(PINN.parameters())


  optimizer = torch.optim.LBFGS(PINN.parameters(), lr=0.1, 
                            max_iter = 20, 
                            max_eval = 30, 
                            tolerance_grad = 1e-8, 
                            tolerance_change = 1e-8, 
                            history_size = 100, 
                            line_search_fn = 'strong_wolfe')




  nan_flag = train_model(max_iter,reps)

  torch.save(PINN.state_dict(),label+'_'+str(reps)+'.pt')
  train_loss_full.append(train_loss)
  test_mse_full.append(test_mse_loss)
  test_re_full.append(test_re_loss)
  #elapsed_time[reps] = time.time() - start_time
  beta_full.append(beta_val)

  #print('Training time: %.2f' % (elapsed_time[reps]))

mdic = {"train_loss": train_loss_full,"test_mse_loss": test_mse_full,"test_re_loss": test_re_full,"Time": elapsed_time, "beta": beta_full, "label": label,"Thresh Time": time_threshold,"Thresh epoch": epoch_threshold}
savemat(label+'.mat', mdic)

0
Sequentialmodel(
  (activation): Sigmoid()
  (loss_function): MSELoss()
  (linears): ModuleList(
    (0): Linear(in_features=2, out_features=50, bias=True)
    (1): Linear(in_features=50, out_features=50, bias=True)
    (2): Linear(in_features=50, out_features=50, bias=True)
    (3): Linear(in_features=50, out_features=50, bias=True)
    (4): Linear(in_features=50, out_features=50, bias=True)
    (5): Linear(in_features=50, out_features=50, bias=True)
    (6): Linear(in_features=50, out_features=50, bias=True)
    (7): Linear(in_features=50, out_features=50, bias=True)
    (8): Linear(in_features=50, out_features=50, bias=True)
    (9): Linear(in_features=50, out_features=1, bias=True)
  )
)
0
0 Train Loss 76.16456 Test MSE 4.377091757752036 Test RE 1.0000020394712161
1 Train Loss 76.16455 Test MSE 4.377093489491173 Test RE 1.000002237290064
2 Train Loss 57.40902 Test MSE 9.142752691220755 Test RE 1.4452618750990582
3 Train Loss 50.43755 Test MSE 8.157503064814474 Test RE 1.365169833

93 Train Loss 0.014899708 Test MSE 0.0017192306205169195 Test RE 0.019818698419818672
94 Train Loss 0.01464084 Test MSE 0.0015843754818792312 Test RE 0.019025545461806705
95 Train Loss 0.014467437 Test MSE 0.0016203773900689744 Test RE 0.019240490864702187
96 Train Loss 0.014167456 Test MSE 0.001465006823689397 Test RE 0.018294809180839165
97 Train Loss 0.013785614 Test MSE 0.0014377925552702423 Test RE 0.01812408855767697
98 Train Loss 0.013287755 Test MSE 0.0012393881392842516 Test RE 0.016827195023552007
99 Train Loss 0.012735401 Test MSE 0.0011474568092953387 Test RE 0.016191095486630766
100 Train Loss 0.012320614 Test MSE 0.001068644787198076 Test RE 0.01562516981940129
101 Train Loss 0.011266165 Test MSE 0.00098404886922061 Test RE 0.014993961576661278
102 Train Loss 0.010568105 Test MSE 0.001051833391964347 Test RE 0.015501778863310547
103 Train Loss 0.010285802 Test MSE 0.0011760402415862117 Test RE 0.016391517118200416
104 Train Loss 0.010142869 Test MSE 0.0010882738451950741 

KeyboardInterrupt: 

In [None]:
import scipy.io as sio
import numpy as np

In [None]:
for tune_reps in range(4,5):
    label = "KG_swish_tune"+str(tune_reps)+".mat"
    data = sio.loadmat(label)
 
    #re = np.array(data["test_re_loss"])
    #print("tune_reps"," ",np.mean(re[0][0]))

In [None]:
tune_reps = 4
label = "KG_swish_tune"+str(tune_reps)+".mat"
data = sio.loadmat(label)
for k in range(10):
    print(tune_reps," ",data["test_re_loss"][0][k].shape)

In [None]:
np.mean(data["test_re_loss"][:,-1])