In [8]:
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
import numpy as np
import mlbfgs
from torch.utils.data import DataLoader, TensorDataset

# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device = torch.device('cpu')
print(f'Using device: {device}')
res_scale = .1

class PINN(nn.Module):
    def __init__(self):
        super(PINN, self).__init__()
        self.hidden_layer1 = nn.Linear(1, 24).double()
        self.hidden_layer2 = nn.Linear(24, 24).double()
        self.hidden_layer3 = nn.Linear(1, 13).double()
        self.hidden_layer4 = nn.Linear(24, 13).double()

        self.hidden_layer5 = nn.Linear(13, 13).double()
        self.hidden_layer6 = nn.Linear(13, 13).double()
        self.hidden_layer7 = nn.Linear(13, 7).double()
        self.hidden_layer8 = nn.Linear(13, 7).double()

        self.hidden_layer9 = nn.Linear(7, 7).double()
        self.hidden_layer10 = nn.Linear(7, 7).double()
        self.hidden_layer11 = nn.Linear(7, 1).double()
        self.hidden_layer12 = nn.Linear(7, 1).double()
    
        
    def forward(self, y):
        res = torch.sin(self.hidden_layer1(y))
        res = torch.tanh(self.hidden_layer2(res))
        y = self.hidden_layer3(y) + res_scale * self.hidden_layer4(res)

        res = torch.sin(self.hidden_layer5(y))
        res = torch.tanh(self.hidden_layer6(res))
        y = self.hidden_layer7(y) + res_scale * self.hidden_layer8(res)

        res = torch.sin(self.hidden_layer9(y))
        res = torch.tanh(self.hidden_layer10(res))
        y = self.hidden_layer11(y) + res_scale * self.hidden_layer12(res)
        return y
    
    def U(self, y):
        return (self(y) - self(-y)) / 2

    def get_lam(self):
        y = torch.linspace(-2,2,10,dtype=torch.float64).view(-1,1).to(device)
        y.requires_grad = True
        U = self.U(y)
        U_y = torch.autograd.grad(U, y, grad_outputs=torch.ones_like(U), create_graph=True)[0]
        U_yy = torch.autograd.grad(U_y, y, grad_outputs=torch.ones_like(U_y), create_graph=True)[0]
        return torch.mean(torch.divide(-(1 + U_y) * U_y - (U + y) * U_yy, y * U_yy))
        # return torch.mean(torch.divide((y + U) * U_y,U - y * U_y))
    def get_fixed_lam(self):
        return .4
    
def f(y,U,U_y,lam):
    return -lam * U + ((1 + lam) * y + U) * U_y

def compute_derivative(f, y, model, lam, orders,finite=False):
    y.requires_grad = True
    U = model.U(y)
    U_y = torch.autograd.grad(U, y, grad_outputs=torch.ones_like(U), create_graph=True)[0]
    lam = model.get_lam()
    f_val = f(y, U, U_y, lam)
    h = y[1] - y[0]
    res = []
    if not finite:
        for _ in range(int(orders.max())):
            f_val = torch.autograd.grad(f_val, y, grad_outputs=torch.ones_like(f_val), create_graph=True)[0]
            if _ + 1 in orders:
                res.append(f_val)
    else:
        for _ in range(int(orders.max())):
            f_val = (y[1:] - y[:-1]) / h
            if _ + 1 in orders:
                res.append(f_val)
    return res

# @torch.compile
def Loss(model, y, collocation_points,mode,step):
    y.requires_grad = True
    U = model.U(y)
    U_y = torch.autograd.grad(U, y, grad_outputs=torch.ones_like(U), create_graph=True)[0]
    # U_yy = torch.autograd.grad(U, y, grad_outputs=torch.ones_like(U), create_graph=True,retain_graph=)[0]
    if mode == 'fixed':
        lam = model.get_fixed_lam()
    if mode == 'learned':
        lam = model.get_lam()


    # Equation loss
    f_val = f(y, U, U_y,lam)

    # Smooth loss 3rd and fifth derivative
    derivatives = compute_derivative(f,collocation_points,model,lam, orders=np.array([3.0]),finite=True)
    f_yyy = derivatives[0]
    #f_yyyyy = derivatives[1]
 

    # Condition loss U(-2) = 1
    g = model.U(torch.tensor([-2.0], dtype=y.dtype, device=y.device)) - 1
    
    equation_loss = torch.mean(f_val**2)
    condition_loss = torch.mean(g**2)

   # experiment_loss = torch.exp(torch.tensor(data=[-0.5],dtype=torch.float64) * step + torch.tensor(data=[1],dtype=torch.float64)) * torch.mean(U_y**2)
    total_loss = equation_loss + condition_loss + 1e-3*torch.mean(f_yyy**2) #+ 1e-10*torch.mean(f_yyyyy**2) 
    return total_loss



Using device: cpu


### Fixed lambda 

In [11]:
# Initialize model 
import slim
model = PINN().to(device)
model = torch.compile(model)

# optimizer = mlbfgs.LBFGSOptimizer(
#     model_parameters=model.parameters(),
#     lr = .1,              
#     momentum=0.9,       
#     weight_decay=0.0,    
#     rho_min=0.0001,
#     mm_p=0.9,
#     mm_g=0.99,
#     update_freq=100,      
#     hist_sz=100,
#     decay_period=10,
#     damping=0.2,         
#     kl_clip=0.005         
# )
optimizer = slim.SlimQN(
    model_parameters=model.parameters(),
    lr = .05,              
    momentum=0.9,       
    weight_decay=0.0,    
    rho_min=0.0001,
    mm_p=0.9,
    mm_g=0.99,
    update_freq=100,      
    hist_sz=100,
    decay_period=10,
    damping=0.2,         
    kl_clip=0.005         
)
scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.99)

# Training parameters
num_epochs = 1000
batch_size = 8912 // 2
y_data = torch.linspace(-2,2,10000,dtype=torch.float64).view(-1, 1).to(device)

Ns = 1000
collocation_points = torch.FloatTensor(Ns).uniform_(-1, 1).view(-1, 1).double().to(device)
collocation_points = (collocation_points - collocation_points.mean()) / collocation_points.std()

# Create DataLoader for y_data and collocation_points
y_dataset = TensorDataset(y_data)
collocation_dataset = TensorDataset(collocation_points)
y_loader = DataLoader(y_dataset, batch_size=batch_size, shuffle=True, num_workers=8)
collocation_loader = DataLoader(collocation_dataset, batch_size=batch_size, shuffle=True, num_workers=8)

def closure(y_batch, collocation_batch,step):
    optimizer.zero_grad()  
    loss = Loss(model, y_batch, collocation_batch,'fixed',step)  
    loss.backward()  
    return loss

for epoch in range(num_epochs):
    for y_batch, collocation_batch in zip(y_loader, collocation_loader):
        y_batch = y_batch[0].to(device)
        collocation_batch = collocation_batch[0].to(device)
        optimizer.step() #lambda: closure(y_batch, collocation_batch,epoch)
    
    if epoch % 100 == 0:
        y_batch = next(iter(y_loader))[0].to(device)
        collocation_batch = next(iter(collocation_loader))[0].to(device)
        loss = Loss(model, y_batch, collocation_batch,'fixed',epoch)
        print(f'epoch {epoch}, loss {loss.item()}')
        if loss.item() <= 1e-8:
            break

[SLIM] initialize SlimQN optimizer:
-------------------------------------
	Initial learning rate: 0.05
	Momentum for update: 0.9
	Weight decay: 0.0
	Damping factor: 0.2
	Momentum for param: 0.9
	Momentum for grad: 0.99
	History vector size: 100
	Base Hessian update frequency: 100
	Gradient clipping: 0.005
	Number of threads: 16
-------------------------------------


Exception ignored in: <function _MultiProcessingDataLoaderIter.__del__ at 0x7f6a8f437640>
Traceback (most recent call last):
  File "/home/kido/.local/lib/python3.10/site-packages/torch/utils/data/dataloader.py", line 1479, in __del__
    self._shutdown_workers()
  File "/home/kido/.local/lib/python3.10/site-packages/torch/utils/data/dataloader.py", line 1462, in _shutdown_workers
    if w.is_alive():
  File "/usr/lib/python3.10/multiprocessing/process.py", line 160, in is_alive
    Exception ignored in: assert self._parent_pid == os.getpid(), 'can only test a child process'<function _MultiProcessingDataLoaderIter.__del__ at 0x7f6a8f437640>

AssertionError: Traceback (most recent call last):
can only test a child process  File "/home/kido/.local/lib/python3.10/site-packages/torch/utils/data/dataloader.py", line 1479, in __del__

    self._shutdown_workers()
  File "/home/kido/.local/lib/python3.10/site-packages/torch/utils/data/dataloader.py", line 1462, in _shutdown_workers
    if w.i

RuntimeError: DataLoader worker (pid(s) 481982) exited unexpectedly

### Lambda learned in the process 

In [None]:
def closure(y_batch, collocation_batch):
    optimizer.zero_grad()  
    loss = Loss(model, y_batch, collocation_batch,'learned')  
    loss.backward()  
    return loss

for epoch in range(num_epochs):
    for y_batch, collocation_batch in zip(y_loader, collocation_loader):
        y_batch = y_batch[0].to(device)
        collocation_batch = collocation_batch[0].to(device)
        optimizer.step(lambda: closure(y_batch, collocation_batch)) #
    
    if epoch % 10 == 0:
        y_batch = next(iter(y_loader))[0].to(device)
        collocation_batch = next(iter(collocation_loader))[0].to(device)
        loss = Loss(model, y_batch, collocation_batch,'learned')
        print(f'epoch {epoch}, loss {loss.item()}')
        if loss.item() <= 1e-8:
            break

In [None]:
y_test = 2*torch.sin(torch.linspace(-np.pi/2, np.pi/2, 100)).view(-1, 1).to(device)
y_test.requires_grad = True
# Get model predictions and detach to move to CPU
U_pred = model.U(y_test)
U_pred_y = torch.autograd.grad(U_pred, y_test, grad_outputs=torch.ones_like(U_pred), create_graph=True)[0]
U_pred_yy = torch.autograd.grad(U_pred_y, y_test, grad_outputs=torch.ones_like(U_pred_y), create_graph=True)[0]
U_pred_yyy = torch.autograd.grad(U_pred_yy, y_test, grad_outputs=torch.ones_like(U_pred_yy), create_graph=True)[0]
U_pred_yyyy = torch.autograd.grad(U_pred_yyy, y_test, grad_outputs=torch.ones_like(U_pred_yyy), create_graph=True)[0]
U_pred_yyyyy = torch.autograd.grad(U_pred_yyyy, y_test, grad_outputs=torch.ones_like(U_pred_yyyy), create_graph=True)[0]

lam = model.get_lam(y_test).detach().cpu().numpy()
print(lam)
residual = f(y_test,U_pred,U_pred_y,lam)
print(torch.sqrt(torch.mean(residual**2)))
U_pred = U_pred.detach().cpu().numpy()
# Generate exact solution using implicit formula
U_positive = np.linspace(0, 1, 100)
y_true = np.array([U_positive + U_positive**(1 + 1/lam), -U_positive - U_positive**(1 + 1/lam)]).flatten()
order = y_true.argsort()
U_sorted = np.array([-U_positive, U_positive]).flatten()[order]

y_sorted = y_true[order]

# Convert test data to numpy
y_test_np = y_test.detach().cpu().numpy()

# Plotting
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))

# Plot the PINN Prediction vs Exact Solution
ax1.plot(y_test_np, U_pred, '.-', label='PINN Prediction', color='#1f77b4', markersize=5)
ax1.plot(y_sorted, U_sorted, label='Exact Solution', color='#ff7f0e', linestyle='--', linewidth=2)
ax1.set_title('Comparison of PINN Prediction and Exact Solution')
ax1.set_xlabel('y')
ax1.set_ylabel('U')
ax1.grid(True, which='both', linestyle='--', linewidth=0.5)
ax1.legend()

# Plot the third derivative
ax2.plot(y_test_np, U_pred_yyy.detach().cpu().numpy(), '.-', label='Third Derivative of U', color='#2ca02c', markersize=5)
ax2.set_title('Third Derivative of U')
ax2.set_xlabel('y')
ax2.set_ylabel('d^3U/dy^3')
ax2.grid(True, which='both', linestyle='--', linewidth=0.51)
ax2.legend()

plt.tight_layout()
plt.show()