# PDE Solving

In [None]:
import jax
import jax.numpy as jnp

from src.pdes import *
from src.utils import *
from src.std_kan import StdKAN

from jaxkan.KAN import KAN

import optax
from flax import nnx

import os

# Create the directory if it doesn't exist
results_dir = "pde_results"
os.makedirs(results_dir, exist_ok=True)

## Grid Search Parameters

In [None]:
pde_dict = {"allen-cahn": ac_res, "burgers": burgers_res, "helmholtz": helmholtz_res}

N_points = 2**6

RBA_gamma = 0.999
RBA_eta = 0.01

seed = 42

num_epochs = 5000

opt_type = optax.adam(learning_rate=0.001)

G_values = [5, 10, 20]
widths = [2, 4, 8, 16, 32, 64]
depths = [1, 2, 3, 4]

## Baseline Results

In [None]:
experiment_name = "baseline"
results_file = os.path.join(results_dir, f"{experiment_name}.txt")

# Define the headers
header = "pde, G, width, depth, run, loss, l2"

# Check if the file exists and write the header if it doesn't
if not os.path.exists(results_file):
    with open(results_file, "w") as file:
        file.write(header + "\n")

In [None]:
# Procedure
for pde_name in pde_dict.keys():
    print(f"Running Experiments for {pde_name} equation.")
    pde_res = pde_dict[pde_name]

    # Define the loss function for this PDE
    def loss_fn(model, l_E, l_B, pde_collocs, bc_collocs, bc_data):

        # ------------- PDE ---------------------------- #
        pde_residuals = pde_res(model, pde_collocs)
    
        # Get new RBA weights
        abs_pde_res = jnp.abs(pde_residuals)
        l_E_new = (RBA_gamma*l_E) + (RBA_eta*abs_pde_res/jnp.max(abs_pde_res))
    
        # Multiply by RBA weights
        w_resids_pde = l_E_new * pde_residuals
    
        # Get loss
        pde_loss = jnp.mean(w_resids_pde**2)
    
    
        # ------------- BC ----------------------------- #
        bc_residuals = model(bc_collocs) - bc_data
    
        # Get new RBA weights
        abs_bc_res = jnp.abs(bc_residuals)
        l_B_new = (RBA_gamma*l_B) + (RBA_eta*abs_bc_res/jnp.max(abs_bc_res))
    
        # Multiply by RBA weights
        w_resids_bc = l_B_new * bc_residuals
    
        # Loss
        bc_loss = jnp.mean(w_resids_bc**2)
    
        
        # ------------- Total --------------------------- #
        total_loss = pde_loss + bc_loss
    
        return total_loss, (l_E_new, l_B_new)
        
    # Define the train step
    @nnx.jit
    def train_step(model, optimizer, l_E, l_B, pde_collocs, bc_collocs, bc_data):
    
        (loss, (l_E_new, l_B_new)), grads = nnx.value_and_grad(loss_fn, has_aux = True)(model, l_E, l_B, pde_collocs, bc_collocs, bc_data)
    
        optimizer.update(grads)
    
        return loss, l_E_new, l_B_new

    # Get the reference solution
    refsol, coords = get_ref(pde_name)

    # Get collocation points
    pde_collocs, bc_collocs, bc_data = get_collocs(pde_name, N_points)

    # Model input/output
    n_in, n_out = pde_collocs.shape[1], bc_data.shape[1]

    # Grid search
    for G in G_values:
        print(f"\tUsing G = {G}.")

        for depth in depths:
            for width in widths:

                hidden = [width]*depth
                layer_dims = [n_in, *hidden, n_out]

                req_params = {'k': 3, 'G': G, 'grid_range': (-1.0, 1.0), 'grid_e': 1.0, 'residual': nnx.silu, 'external_weights': True, 'add_bias': True,
                              'init_scheme': {'type': 'default'}}
                
                print(f"\t\tTraining model with dimensions {layer_dims}.")

                for run in [1, 2, 3, 4, 5]:

                    # Initialize RBA weights
                    l_E = jnp.ones((pde_collocs.shape[0], 1))
                    l_B = jnp.ones((bc_collocs.shape[0], 1))

                    model = KAN(layer_dims = layer_dims, layer_type = 'spline', required_parameters = req_params, seed = seed+run)
                    optimizer = nnx.Optimizer(model, opt_type)
                
                    # Train
                    for epoch in range(num_epochs):
                        train_loss, l_E, l_B = train_step(model, optimizer, l_E, l_B, pde_collocs, bc_collocs, bc_data)
                
                    # Evaluate
                    output = model(coords).reshape(refsol.shape)
                    l2error = jnp.linalg.norm(output-refsol)/jnp.linalg.norm(refsol)
                
                    # Log results
                    new_row = f"{pde_name}, {G}, {width}, {depth}, {run}, {train_loss}, {l2error}"
                                    
                    # Append the row to the file
                    with open(results_file, "a") as rfile:
                        rfile.write(new_row + "\n")

                    print(f"\t\t\t{run}. Final loss: {train_loss:.2e} \tRel. L2 Error: {l2error:.2e}")

## LeCun-like Results

In [None]:
experiment_name = "lecun"
results_file = os.path.join(results_dir, f"{experiment_name}.txt")

# Define the headers
header = "pde, G, width, depth, run, loss, l2"

# Check if the file exists and write the header if it doesn't
if not os.path.exists(results_file):
    with open(results_file, "w") as file:
        file.write(header + "\n")

In [None]:
# Procedure
for pde_name in pde_dict.keys():
    print(f"Running Experiments for {pde_name} equation.")
    pde_res = pde_dict[pde_name]

    # Define the loss function for this PDE
    def loss_fn(model, l_E, l_B, pde_collocs, bc_collocs, bc_data):

        # ------------- PDE ---------------------------- #
        pde_residuals = pde_res(model, pde_collocs)
    
        # Get new RBA weights
        abs_pde_res = jnp.abs(pde_residuals)
        l_E_new = (RBA_gamma*l_E) + (RBA_eta*abs_pde_res/jnp.max(abs_pde_res))
    
        # Multiply by RBA weights
        w_resids_pde = l_E_new * pde_residuals
    
        # Get loss
        pde_loss = jnp.mean(w_resids_pde**2)
    
    
        # ------------- BC ----------------------------- #
        bc_residuals = model(bc_collocs) - bc_data
    
        # Get new RBA weights
        abs_bc_res = jnp.abs(bc_residuals)
        l_B_new = (RBA_gamma*l_B) + (RBA_eta*abs_bc_res/jnp.max(abs_bc_res))
    
        # Multiply by RBA weights
        w_resids_bc = l_B_new * bc_residuals
    
        # Loss
        bc_loss = jnp.mean(w_resids_bc**2)
    
        
        # ------------- Total --------------------------- #
        total_loss = pde_loss + bc_loss
    
        return total_loss, (l_E_new, l_B_new)
        
    # Define the train step
    @nnx.jit
    def train_step(model, optimizer, l_E, l_B, pde_collocs, bc_collocs, bc_data):
    
        (loss, (l_E_new, l_B_new)), grads = nnx.value_and_grad(loss_fn, has_aux = True)(model, l_E, l_B, pde_collocs, bc_collocs, bc_data)
    
        optimizer.update(grads)
    
        return loss, l_E_new, l_B_new

    # Get the reference solution
    refsol, coords = get_ref(pde_name)

    # Get collocation points
    pde_collocs, bc_collocs, bc_data = get_collocs(pde_name, N_points)

    # Model input/output
    n_in, n_out = pde_collocs.shape[1], bc_data.shape[1]

    # Grid search
    for G in G_values:
        print(f"\tUsing G = {G}.")

        for depth in depths:
            for width in widths:

                hidden = [width]*depth
                layer_dims = [n_in, *hidden, n_out]

                req_params = {'k': 3, 'G': G, 'grid_range': (-1.0, 1.0), 'grid_e': 1.0, 'residual': nnx.silu, 'external_weights': True, 'add_bias': True,
                              'init_scheme': {'type': 'lecun', 'gain': None, 'distribution':'uniform'}}
                
                print(f"\t\tTraining model with dimensions {layer_dims}.")

                for run in [1, 2, 3, 4, 5]:

                    # Initialize RBA weights
                    l_E = jnp.ones((pde_collocs.shape[0], 1))
                    l_B = jnp.ones((bc_collocs.shape[0], 1))

                    model = KAN(layer_dims = layer_dims, layer_type = 'spline', required_parameters = req_params, seed = seed+run)
                    optimizer = nnx.Optimizer(model, opt_type)
                
                    # Train
                    for epoch in range(num_epochs):
                        train_loss, l_E, l_B = train_step(model, optimizer, l_E, l_B, pde_collocs, bc_collocs, bc_data)
                
                    # Evaluate
                    output = model(coords).reshape(refsol.shape)
                    l2error = jnp.linalg.norm(output-refsol)/jnp.linalg.norm(refsol)
                
                    # Log results
                    new_row = f"{pde_name}, {G}, {width}, {depth}, {run}, {train_loss}, {l2error}"
                                    
                    # Append the row to the file
                    with open(results_file, "a") as rfile:
                        rfile.write(new_row + "\n")

                    print(f"\t\t\t{run}. Final loss: {train_loss:.2e} \tRel. L2 Error: {l2error:.2e}")

## Glorot-like Results

In [None]:
experiment_name = "glorot"
results_file = os.path.join(results_dir, f"{experiment_name}.txt")

# Define the headers
header = "pde, G, width, depth, run, loss, l2"

# Check if the file exists and write the header if it doesn't
if not os.path.exists(results_file):
    with open(results_file, "w") as file:
        file.write(header + "\n")

In [None]:
# Procedure
for pde_name in pde_dict.keys():
    print(f"Running Experiments for {pde_name} equation.")
    pde_res = pde_dict[pde_name]

    # Define the loss function for this PDE
    def loss_fn(model, l_E, l_B, pde_collocs, bc_collocs, bc_data):

        # ------------- PDE ---------------------------- #
        pde_residuals = pde_res(model, pde_collocs)
    
        # Get new RBA weights
        abs_pde_res = jnp.abs(pde_residuals)
        l_E_new = (RBA_gamma*l_E) + (RBA_eta*abs_pde_res/jnp.max(abs_pde_res))
    
        # Multiply by RBA weights
        w_resids_pde = l_E_new * pde_residuals
    
        # Get loss
        pde_loss = jnp.mean(w_resids_pde**2)
    
    
        # ------------- BC ----------------------------- #
        bc_residuals = model(bc_collocs) - bc_data
    
        # Get new RBA weights
        abs_bc_res = jnp.abs(bc_residuals)
        l_B_new = (RBA_gamma*l_B) + (RBA_eta*abs_bc_res/jnp.max(abs_bc_res))
    
        # Multiply by RBA weights
        w_resids_bc = l_B_new * bc_residuals
    
        # Loss
        bc_loss = jnp.mean(w_resids_bc**2)
    
        
        # ------------- Total --------------------------- #
        total_loss = pde_loss + bc_loss
    
        return total_loss, (l_E_new, l_B_new)
        
    # Define the train step
    @nnx.jit
    def train_step(model, optimizer, l_E, l_B, pde_collocs, bc_collocs, bc_data):
    
        (loss, (l_E_new, l_B_new)), grads = nnx.value_and_grad(loss_fn, has_aux = True)(model, l_E, l_B, pde_collocs, bc_collocs, bc_data)
    
        optimizer.update(grads)
    
        return loss, l_E_new, l_B_new

    # Get the reference solution
    refsol, coords = get_ref(pde_name)

    # Get collocation points
    pde_collocs, bc_collocs, bc_data = get_collocs(pde_name, N_points)

    # Model input/output
    n_in, n_out = pde_collocs.shape[1], bc_data.shape[1]

    # Grid search
    for G in G_values:
        print(f"\tUsing G = {G}.")

        for depth in depths:
            for width in widths:

                hidden = [width]*depth
                layer_dims = [n_in, *hidden, n_out]

                req_params = {'k': 3, 'G': G, 'grid_range': (-1.0, 1.0), 'grid_e': 1.0, 'residual': nnx.silu, 'external_weights': True, 'add_bias': True,
                              'init_scheme': {'type': 'glorot', 'gain': None, 'distribution':'uniform', 'sample_size': 10000}}
                
                print(f"\t\tTraining model with dimensions {layer_dims}.")

                for run in [1, 2, 3, 4, 5]:

                    # Initialize RBA weights
                    l_E = jnp.ones((pde_collocs.shape[0], 1))
                    l_B = jnp.ones((bc_collocs.shape[0], 1))

                    model = KAN(layer_dims = layer_dims, layer_type = 'spline', required_parameters = req_params, seed = seed+run)
                    optimizer = nnx.Optimizer(model, opt_type)
                
                    # Train
                    for epoch in range(num_epochs):
                        train_loss, l_E, l_B = train_step(model, optimizer, l_E, l_B, pde_collocs, bc_collocs, bc_data)
                
                    # Evaluate
                    output = model(coords).reshape(refsol.shape)
                    l2error = jnp.linalg.norm(output-refsol)/jnp.linalg.norm(refsol)
                
                    # Log results
                    new_row = f"{pde_name}, {G}, {width}, {depth}, {run}, {train_loss}, {l2error}"
                                    
                    # Append the row to the file
                    with open(results_file, "a") as rfile:
                        rfile.write(new_row + "\n")

                    print(f"\t\t\t{run}. Final loss: {train_loss:.2e} \tRel. L2 Error: {l2error:.2e}")

## Custom standardization results

In [None]:
experiment_name = "std"
results_file = os.path.join(results_dir, f"{experiment_name}.txt")

# Define the headers
header = "pde, G, width, depth, run, loss, l2"

# Check if the file exists and write the header if it doesn't
if not os.path.exists(results_file):
    with open(results_file, "w") as file:
        file.write(header + "\n")

In [None]:
# Procedure
for pde_name in pde_dict.keys():
    print(f"Running Experiments for {pde_name} equation.")
    pde_res = pde_dict[pde_name]

    # Define the loss function for this PDE
    def loss_fn(model, l_E, l_B, pde_collocs, bc_collocs, bc_data):

        # ------------- PDE ---------------------------- #
        pde_residuals = pde_res(model, pde_collocs)
    
        # Get new RBA weights
        abs_pde_res = jnp.abs(pde_residuals)
        l_E_new = (RBA_gamma*l_E) + (RBA_eta*abs_pde_res/jnp.max(abs_pde_res))
    
        # Multiply by RBA weights
        w_resids_pde = l_E_new * pde_residuals
    
        # Get loss
        pde_loss = jnp.mean(w_resids_pde**2)
    
    
        # ------------- BC ----------------------------- #
        bc_residuals = model(bc_collocs) - bc_data
    
        # Get new RBA weights
        abs_bc_res = jnp.abs(bc_residuals)
        l_B_new = (RBA_gamma*l_B) + (RBA_eta*abs_bc_res/jnp.max(abs_bc_res))
    
        # Multiply by RBA weights
        w_resids_bc = l_B_new * bc_residuals
    
        # Loss
        bc_loss = jnp.mean(w_resids_bc**2)
    
        
        # ------------- Total --------------------------- #
        total_loss = pde_loss + bc_loss
    
        return total_loss, (l_E_new, l_B_new)
        
    # Define the train step
    @nnx.jit
    def train_step(model, optimizer, l_E, l_B, pde_collocs, bc_collocs, bc_data):
    
        (loss, (l_E_new, l_B_new)), grads = nnx.value_and_grad(loss_fn, has_aux = True)(model, l_E, l_B, pde_collocs, bc_collocs, bc_data)
    
        optimizer.update(grads)
    
        return loss, l_E_new, l_B_new

    # Get the reference solution
    refsol, coords = get_ref(pde_name)

    # Get collocation points
    pde_collocs, bc_collocs, bc_data = get_collocs(pde_name, N_points)

    # Model input/output
    n_in, n_out = pde_collocs.shape[1], bc_data.shape[1]

    # Grid search
    for G in G_values:
        print(f"\tUsing G = {G}.")

        for depth in depths:
            for width in widths:

                hidden = [width]*depth
                layer_dims = [n_in, *hidden, n_out]

                req_params = {'k': 3, 'G': G, 'grid_range': (-1.0, 1.0), 'grid_e': 1.0, 'residual': nnx.silu, 'external_weights': True, 'add_bias': True,
                              'init_scheme': {'gain': None, 'distribution':'uniform'}}
                
                print(f"\t\tTraining model with dimensions {layer_dims}.")

                for run in [1, 2, 3, 4, 5]:

                    # Initialize RBA weights
                    l_E = jnp.ones((pde_collocs.shape[0], 1))
                    l_B = jnp.ones((bc_collocs.shape[0], 1))

                    model = StdKAN(layer_dims = layer_dims, required_parameters = req_params, seed = seed+run)

                    model = KAN(layer_dims = layer_dims, layer_type = 'spline', required_parameters = req_params, seed = seed+run)
                    optimizer = nnx.Optimizer(model, opt_type)
                
                    # Train
                    for epoch in range(num_epochs):
                        train_loss, l_E, l_B = train_step(model, optimizer, l_E, l_B, pde_collocs, bc_collocs, bc_data)
                
                    # Evaluate
                    output = model(coords).reshape(refsol.shape)
                    l2error = jnp.linalg.norm(output-refsol)/jnp.linalg.norm(refsol)
                
                    # Log results
                    new_row = f"{pde_name}, {G}, {width}, {depth}, {run}, {train_loss}, {l2error}"
                                    
                    # Append the row to the file
                    with open(results_file, "a") as rfile:
                        rfile.write(new_row + "\n")

                    print(f"\t\t\t{run}. Final loss: {train_loss:.2e} \tRel. L2 Error: {l2error:.2e}")

## Empirical Power Law Results

In [None]:
pows_basis = [0.0, 0.25, 0.5, 0.75, 1.0, 1.25, 1.5, 1.75, 2.00]
pows_res = [0.0, 0.25, 0.5, 0.75, 1.0, 1.25, 1.5, 1.75, 2.00]

In [None]:
experiment_name = "power"
results_file = os.path.join(results_dir, f"{experiment_name}.txt")

# Define the headers
header = "pde, G, width, depth, pow_basis, pow_res, run, loss, l2"

# Check if the file exists and write the header if it doesn't
if not os.path.exists(results_file):
    with open(results_file, "w") as file:
        file.write(header + "\n")

In [None]:
# Procedure
for pde_name in pde_dict.keys():
    print(f"Running Experiments for {pde_name} equation.")
    pde_res = pde_dict[pde_name]

    # Define the loss function for this PDE
    def loss_fn(model, l_E, l_B, pde_collocs, bc_collocs, bc_data):

        # ------------- PDE ---------------------------- #
        pde_residuals = pde_res(model, pde_collocs)
    
        # Get new RBA weights
        abs_pde_res = jnp.abs(pde_residuals)
        l_E_new = (RBA_gamma*l_E) + (RBA_eta*abs_pde_res/jnp.max(abs_pde_res))
    
        # Multiply by RBA weights
        w_resids_pde = l_E_new * pde_residuals
    
        # Get loss
        pde_loss = jnp.mean(w_resids_pde**2)
    
    
        # ------------- BC ----------------------------- #
        bc_residuals = model(bc_collocs) - bc_data
    
        # Get new RBA weights
        abs_bc_res = jnp.abs(bc_residuals)
        l_B_new = (RBA_gamma*l_B) + (RBA_eta*abs_bc_res/jnp.max(abs_bc_res))
    
        # Multiply by RBA weights
        w_resids_bc = l_B_new * bc_residuals
    
        # Loss
        bc_loss = jnp.mean(w_resids_bc**2)
    
        
        # ------------- Total --------------------------- #
        total_loss = pde_loss + bc_loss
    
        return total_loss, (l_E_new, l_B_new)
        
    # Define the train step
    @nnx.jit
    def train_step(model, optimizer, l_E, l_B, pde_collocs, bc_collocs, bc_data):
    
        (loss, (l_E_new, l_B_new)), grads = nnx.value_and_grad(loss_fn, has_aux = True)(model, l_E, l_B, pde_collocs, bc_collocs, bc_data)
    
        optimizer.update(grads)
    
        return loss, l_E_new, l_B_new

    # Get the reference solution
    refsol, coords = get_ref(pde_name)

    # Get collocation points
    pde_collocs, bc_collocs, bc_data = get_collocs(pde_name, N_points)

    # Model input/output
    n_in, n_out = pde_collocs.shape[1], bc_data.shape[1]

    # Grid search
    for G in G_values:
        print(f"\tUsing G = {G}.")

        for depth in depths:
            for width in widths:

                hidden = [width]*depth
                layer_dims = [n_in, *hidden, n_out]
                
                print(f"\t\tTraining model with dimensions {layer_dims}.")

                for pow_basis in pows_basis:

                    for pow_res in pows_res:

                        req_params = {'k': 3, 'G': G, 'grid_range': (-1.0, 1.0), 'grid_e': 1.0, 'residual': nnx.silu, 'external_weights': True, 'add_bias': True,
                                      'init_scheme': {'type': 'power', "const_b": 1.0, "const_r": 1.0, "pow_b1": pow_basis, "pow_b2": pow_basis, "pow_r1": pow_res, "pow_r2": pow_res}}

                        print(f"\t\t\tWorking with pow_basis = {pow_basis} and pow_res = {pow_res}.")

                        for run in [1, 2, 3]:
        
                            # Initialize RBA weights
                            l_E = jnp.ones((pde_collocs.shape[0], 1))
                            l_B = jnp.ones((bc_collocs.shape[0], 1))
        
                            model = KAN(layer_dims = layer_dims, layer_type = 'spline', required_parameters = req_params, seed = seed+run)
                            optimizer = nnx.Optimizer(model, opt_type)
                        
                            # Train
                            for epoch in range(num_epochs):
                                train_loss, l_E, l_B = train_step(model, optimizer, l_E, l_B, pde_collocs, bc_collocs, bc_data)
                        
                            # Evaluate
                            output = model(coords).reshape(refsol.shape)
                            l2error = jnp.linalg.norm(output-refsol)/jnp.linalg.norm(refsol)
                        
                            # Log results
                            new_row = f"{pde_name}, {G}, {width}, {depth}, {pow_basis}, {pow_res}, {run}, {train_loss}, {l2error}"
                                            
                            # Append the row to the file
                            with open(results_file, "a") as rfile:
                                rfile.write(new_row + "\n")
        
                            print(f"\t\t\t\t{run}. Final loss: {train_loss:.2e} \tRel. L2 Error: {l2error:.2e}")