# Residual-Based Iterative Refinement

This notebook demonstrates **how to perform residual-based iterative refinement using the [HyPINO multi-physics neural operator](https://arxiv.org/abs/2509.05117)**.

HyPINO maps a given PDE specification, defined by its coefficients, source term, and boundary conditions, to a **Physics-Informed Neural Network (PINN)** that represents the PDE solution. The resulting PINN can be evaluated continuously over the domain and differentiated analytically to compute **physics residuals**.

### Why Iterative Refinement?

HyPINO generates weights for a fixed, relatively small target PINN architecture. To increase the representational power **without retraining or expanding the architecture**, we can build an **ensemble** of PINNs by calling HyPINO multiple times.

But instead of naively sampling multiple networks, we use the **residuals** of the current prediction to guide each new network in correcting the errors of the previous one. This **iterative refinement** strategy reduced the prediction error by more than **100×** on some benchmarks, using just 3–10 refinement steps.

### Procedure

1. Generate an initial prediction $u^{(0)}$ using HyPINO.  
2. Compute residuals of the governing PDE and boundary conditions.  
3. Feed the residuals back into HyPINO to obtain a corrective network $\delta u^{(t)}$.  
4. Update the solution as $u^{(t+1)} = u^{(t)} + \delta u^{(t)}$. 

Repeating this process yields an ensemble of corrective PINNs whose cumulative output progressively reduces the residual error **without retraining** or backpropagation through the hypernetwork.

In [None]:
import sys, os

project_root = os.path.abspath("..")
sys.path.append(project_root)

In [None]:
import os
import torch
import numpy as np

from src.models import HyPINO
from src.data.utils import to_tensor
from src.data.utils import plot_grids, encode_pde_str

In [None]:
if torch.cuda.is_available():
    device = f'cuda:{torch.cuda.current_device()}'
else:
    device = 'cpu'

In [None]:
model = HyPINO.load_from_safetensors('../models/hypino.safetensors').to(device).eval()

## Example with Helmholtz equation
Load example data for the Helmholtz benchmark PDE used in the paper. See the `02_inference.ipynb` notebook for explanation on the input types for HyPINO.

Note that this example does not have Neumann boundary conditions.

In [None]:
inputs_path = '../assets/helmholtz/arrays'

# inputs
dirichlet_mask = np.load(os.path.join(inputs_path, 'dirichlet_mask.npy'))
dirichlet_conditions = np.load(os.path.join(inputs_path, 'dirichlet_conditions.npy'))
source_function = np.load(os.path.join(inputs_path, 'source_function.npy'))

# if available, load reference solution
reference_solution = np.load(os.path.join(inputs_path, 'reference_solution.npy'))

plot_grids([dirichlet_mask, dirichlet_conditions, source_function, reference_solution], 
           titles=['Dirichlet mask', 'Dirichlet boundary cond', 'Source function', 'Reference solution'])

In [None]:
mat_inputs = np.stack([dirichlet_mask, np.zeros_like(dirichlet_mask),
                   dirichlet_conditions, np.zeros_like(dirichlet_conditions),
                   source_function], axis=0)
mat_inputs_tensor = to_tensor(mat_inputs)

In [None]:
diff_operator = 'uxx + uyy + u'
pde_coeffs = encode_pde_str(diff_operator)
pde_coeffs_tensor = to_tensor([c for c in pde_coeffs.values()])

In [None]:
sample = {
    'pde_coeffs': pde_coeffs_tensor.to(device),
    'mat_inputs': mat_inputs_tensor.to(device),
    'neu_normals': torch.zeros(2, 224, 224, device=device, dtype=mat_inputs_tensor.dtype), # no neumann conditions, fill with zeros
    'pde_str': diff_operator,
}

In [None]:
ensemble_pinn = model.iterative_refinement(sample, num_iter=10, plot_progress=True)

The `iterative_refinement` method returns a PyTorch Module, so you can directly use all PyTorch-specific operations:

In [None]:
torch.save(ensemble_pinn, 'helmholtz_ensemble.pth')
ensemble_pinn = torch.load('helmholtz_ensemble.pth')

The `forward` method of `ensemble_pinn` expects a `(N, 2)` tensor of `(x, y)` coordinates, where `N` is the number of collocation points and can be freely chosen.

In [None]:
# generate a grid of 2D collocation points on the domain [-1, 1]^2
x_grid, y_grid = np.meshgrid(
    np.linspace(-1,  1, 224),
    np.linspace( 1, -1, 224),
)
x = to_tensor(x_grid, requires_grad=True).reshape(-1, 1).to(device)
y = to_tensor(y_grid, requires_grad=True).reshape(-1, 1).to(device)
xy = torch.cat([x, y], dim=-1)

# predict PDE solutions at collocation points
u_pred = ensemble_pinn(xy.unsqueeze(0))[0]
u_pred_grid = u_pred.detach().cpu().numpy().reshape(224, 224)

# plot against reference solution
plot_grids([
    u_pred_grid,
    reference_solution,
    u_pred_grid - reference_solution,
    ], titles=['Predicted solution', 'Reference solution', 'Difference'])

## Example with inner boundaries on Poisson equation
Load example data for the Poisson circles benchmark PDE used in the paper. See the `02_inference.ipynb` notebook for explanation on the input types for HyPINO.

In [None]:
inputs_path = '../assets/poisson_C/arrays'

# inputs
dirichlet_mask = np.load(os.path.join(inputs_path, 'dirichlet_mask.npy'))
dirichlet_conditions = np.load(os.path.join(inputs_path, 'dirichlet_conditions.npy'))
source_function = np.load(os.path.join(inputs_path, 'source_function.npy'))

# domain mask for computing residual
domain_mask = np.load(os.path.join(inputs_path, 'domain_mask.npy'))

# if available, load reference solution
reference_solution = np.load(os.path.join(inputs_path, 'reference_solution.npy'))

plot_grids([dirichlet_mask, dirichlet_conditions, source_function, domain_mask, reference_solution], 
           titles=['Dirichlet mask', 'Dirichlet boundary cond', 'Source function', 'Domain mask', 'Reference solution'])

In [None]:
mat_inputs = np.stack([dirichlet_mask, np.zeros_like(dirichlet_mask),
                   dirichlet_conditions, np.zeros_like(dirichlet_conditions),
                   source_function], axis=0)
mat_inputs_tensor = to_tensor(mat_inputs)

In [None]:
diff_operator = '-uxx - uyy'
pde_coeffs = encode_pde_str(diff_operator)
pde_coeffs_tensor = to_tensor([c for c in pde_coeffs.values()])

In [None]:
sample = {
    'pde_coeffs': pde_coeffs_tensor.to(device),
    'mat_inputs': mat_inputs_tensor.to(device),
    'neu_normals': torch.zeros(2, 224, 224, device=device, dtype=mat_inputs_tensor.dtype),
    'pde_str': diff_operator,
    'domain_mask': to_tensor(domain_mask).to(device) # add domain mask to avoid computing residual inside inner boundaries
}

In [None]:
ensemble_pinn = model.iterative_refinement(sample, num_iter=10, plot_progress=True)

In [None]:
torch.save(ensemble_pinn, 'poisson_C_ensemble.pth')

## Example with Wave equation
Load example data for the wave benchmark PDE used in the paper. See the `02_inference.ipynb` notebook for explanation on the input types for HyPINO.

In [None]:
inputs_path = '../assets/wave/arrays'

# inputs
dirichlet_mask = np.load(os.path.join(inputs_path, 'dirichlet_mask.npy'))
dirichlet_conditions = np.load(os.path.join(inputs_path, 'dirichlet_conditions.npy'))
neumann_mask = np.load(os.path.join(inputs_path, 'neumann_mask.npy'))
neumann_conditions = np.load(os.path.join(inputs_path, 'neumann_conditions.npy'))
source_function = np.load(os.path.join(inputs_path, 'source_function.npy'))

# if available, load reference solution
reference_solution = np.load(os.path.join(inputs_path, 'reference_solution.npy'))

# load neumann normals for computing correct boundary losses
if os.path.exists(os.path.join(inputs_path, 'neumann_normals.npy')):
        neumann_normals = np.load(os.path.join(inputs_path, 'neumann_normals.npy'))
else:
        neumann_normals = np.zeros((2, 224, 224))

plot_grids([dirichlet_mask, dirichlet_conditions, neumann_mask, neumann_conditions, neumann_normals[0], neumann_normals[1], source_function, reference_solution], 
           titles=['Dirichlet mask', 'Dirichlet boundary cond', 'Neumann mask', 
                   'Neumann boundary cond', 'Neumann normals x', 'Neumann normals y', 
                   'Source function', 'Reference solution'])

In [None]:
mat_inputs = np.stack([dirichlet_mask, neumann_mask,
                   dirichlet_conditions, neumann_conditions,
                   source_function], axis=0)
mat_inputs_tensor = to_tensor(mat_inputs)

In [None]:
diff_operator = '0.5 * uyy - 2 * uxx'
pde_coeffs = encode_pde_str(diff_operator)
pde_coeffs_tensor = to_tensor([c for c in pde_coeffs.values()])

In [None]:
sample = {
    'pde_coeffs': pde_coeffs_tensor.to(device),
    'mat_inputs': mat_inputs_tensor.to(device),
    'neu_normals': to_tensor(neumann_normals).to(device),
    'pde_str': diff_operator,
}

In [None]:
pinn_ensemble = model.iterative_refinement(sample, num_iter=10, plot_progress=True)

In [None]:
torch.save(ensemble_pinn, 'wave_ensemble.pth')

### Experiment with different ensemble weightings
The default setting is that every $\delta u^{(t)}$ contributes equally to the final solution. We found that this is a hyperparameter that can have an effect on the final performance of the ensemble. Therefore, we provide the `ensemble_weighting` argument, taking a list of floats representing the weight $\lambda_t$ with which the corresponding $\delta u^{(t)}$ should be multiplied in the ensemble $u^{(t+1)} = u^{(t)} + \lambda_t\delta u^{(t)}$. 


In [None]:
pinn_ensemble = model.iterative_refinement(sample, num_iter=10, plot_progress=True, 
                                           ensemble_weighting=[2., 1.5, 1.5, 1.25, 1.25, 1.1, 1.1, 1, 1, 1])