In [2]:
%matplotlib inline

import numpy as np
import math
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.ticker import LogLocator
from matplotlib.ticker import LogLocator, FormatStrFormatter
import random
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.distributions as dist
from torch.utils.data import Dataset, DataLoader

EPS = 1e-5  # define a small constant for numerical stability control

## importance sampling during training

Below is an example implementation of a PINN in PyTorch that incorporates self-adaptive importance sampling similar to the paper. In this simplified example, we solve a one-dimensional PDE (here, we use a toy example u_xx +sin(πx)=0) by training on collocation points that are periodically updated by sampling more densely in regions where the PDE residual is high.

You can adjust the PDE, network architecture, and hyperparameters as needed.

### Sampling Strategy:

Initially, collocation points are sampled uniformly.
In the function importance_sampling, the model is evaluated on a dense grid over the domain. The absolute value of the residual (plus a small epsilon) forms a weight distribution, and new collocation points are drawn according to this distribution. This mimics the failure-informed sampling by focusing training on regions where the PDE is not well satisfied.
### Training Loop:
The model trains for a set number of epochs. Every few epochs (here, every 1000 epochs), the collocation points are updated using the importance sampling procedure.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt

# Define the neural network for the PINN
class PINN(nn.Module):
    def __init__(self, layers):
        super(PINN, self).__init__()
        self.layers = nn.ModuleList()
        # Create a fully connected network based on the provided layers list.
        for i in range(len(layers)-1):
            self.layers.append(nn.Linear(layers[i], layers[i+1]))
        self.activation = nn.Tanh()
    
    def forward(self, x):
        a = x
        for layer in self.layers[:-1]:
            a = self.activation(layer(a))
        output = self.layers[-1](a)
        return output

# Define the PDE residual
def pde_residual(model, x):
    # Enable gradient tracking on x
    x.requires_grad = True
    u = model(x)
    # Compute first derivative u_x
    u_x = torch.autograd.grad(u, x, grad_outputs=torch.ones_like(u), create_graph=True)[0]
    # Compute second derivative u_xx
    u_xx = torch.autograd.grad(u_x, x, grad_outputs=torch.ones_like(u_x), create_graph=True)[0]
    # For this example, our PDE is: u_xx + sin(pi*x) = 0
    residual = u_xx + torch.sin(np.pi * x)
    return residual

# Generate initial collocation points uniformly in the domain [lb, ub]
def generate_collocation_points(n_points, lb, ub):
    x = np.random.uniform(lb, ub, (n_points, 1))
    return torch.tensor(x, dtype=torch.float32)

# Importance sampling: Evaluate the residual on a fine grid and sample new points
def importance_sampling(model, n_points, lb, ub):
    # Create a fine grid over the domain
    x_fine = np.linspace(lb, ub, 1000).reshape(-1, 1)
    x_fine_tensor = torch.tensor(x_fine, dtype=torch.float32)
    
    # Evaluate the residual on the fine grid
    residual = pde_residual(model, x_fine_tensor)
    res_val = residual.detach().abs().squeeze().numpy()  # absolute error
    # Avoid zero probabilities by adding a small epsilon
    eps = 1e-6
    weights = res_val + eps
    weights = weights / np.sum(weights)
    # Sample indices based on the residual weights
    indices = np.random.choice(len(x_fine), size=n_points, replace=True, p=weights)
    sampled_points = x_fine[indices]
    return torch.tensor(sampled_points, dtype=torch.float32)

# Hyperparameters and domain setup
lb, ub = 0.0, 1.0         # Domain boundaries
n_initial = 100           # Number of initial collocation points
n_epochs = 1000           # Total training epochs
sampling_interval = 200  # How often (in epochs) to update the collocation points
learning_rate = 1e-3      # Learning rate

# Define the network architecture (input, hidden layers, output)
layers = [1, 50, 50, 1]
model = PINN(layers)

optimizer = optim.Adam(model.parameters(), lr=learning_rate)
mse_loss = nn.MSELoss()

# Generate the initial set of collocation points
collocation_points = generate_collocation_points(n_initial, lb, ub)

# Training loop
for epoch in range(n_epochs):
    optimizer.zero_grad()
    
    # Compute the residual of the PDE at the collocation points
    residual = pde_residual(model, collocation_points)
    loss_res = mse_loss(residual, torch.zeros_like(residual))
    
    # (Optional) Include boundary conditions loss here.
    # For example, enforcing u(0)=0 and u(1)=0:
    x_bc = torch.tensor([[lb], [ub]], dtype=torch.float32)
    u_bc = model(x_bc)
    loss_bc = mse_loss(u_bc, torch.zeros_like(u_bc))
    
    loss = loss_res + loss_bc
    loss.backward()
    optimizer.step()
    
    if epoch % 100 == 0:
        print(f"Epoch {epoch}, Total Loss: {loss.item():.6f}")
    
    # Every sampling_interval epochs, perform importance sampling to update collocation points.
    if (epoch + 1) % sampling_interval == 0:
        collocation_points = importance_sampling(model, n_initial, lb, ub)
        print(f"Resampled collocation points at epoch {epoch+1}")

# After training, evaluate the model on a fine grid and plot the results.
x_fine = np.linspace(lb, ub, 1000).reshape(-1, 1)
x_fine_tensor = torch.tensor(x_fine, dtype=torch.float32)
model.eval()
with torch.no_grad():
    u_pred = model(x_fine_tensor).detach().cpu().numpy()

# Exact solution: u(x) = (1/pi^2)*sin(pi*x) with u(0)=u(1)=0.
u_exact = (1/np.pi**2) * np.sin(np.pi * x_fine)

plt.figure(figsize=(8, 5))
plt.plot(x_fine, u_pred, label='PINN Approximation', linewidth=2)
plt.plot(x_fine, u_exact, 'r--', label='Exact Solution', linewidth=2)
plt.xlabel('x')
plt.ylabel('u(x)')
plt.legend()
plt.title('PINN Approximation vs Exact Solution')
plt.show()


In [None]:
# Evaluate g(x) and residual for each sample
def evaluate_G_and_residual(PINN_model, samples, t_coll, params,lambda_bc=2.0):

    t0_tensor =torch.zeros_like(t_coll).requires_grad_(True)
    G_values = []
    residual_values = []

    for i in range(len(samples)):
        row = samples[i]
        input_params = make_input_params(t_coll, row, params['norm_info'])

        # Ensure tensors require gradients
        m_tensor = input_params[0].requires_grad_(True)
        mu_tensor = input_params[1].requires_grad_(True)
        k_tensor = input_params[2].requires_grad_(True)
        y0_tensor = input_params[3].requires_grad_(True)
        v0_tensor = input_params[4].requires_grad_(True)
        
        # Evaluate the limit state function G(x) for each sample
        G_values.append(limit_state_function_G(PINN_model, t_coll, input_params, differentiable=False).item())

        # Compute the residual
        pde_residual = pde_loss(PINN_model, t_coll.requires_grad_(True), m_tensor, mu_tensor, k_tensor, y0_tensor, v0_tensor, norm_info=params['norm_info']).item()
        # Compute the boundary loss
        boundary_residual = boundary_loss(PINN_model, t0_tensor, m_tensor, mu_tensor, k_tensor, y0_tensor, v0_tensor, norm_info=params['norm_info']).item()
        residual_values.append(pde_residual + lambda_bc * boundary_residual)

    # Convert to NumPy arrays for sampling
    # Not really needed, but for more versatile use it is better to have them in numpy format
    G_vals = np.array(G_values)
    residual_vals = np.array(residual_values)
    return G_vals, residual_vals