In [11]:
from kan import KAN, LBFGS
import torch
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from torch import autograd
from tqdm import tqdm
import time
import json
import os

In [12]:
if torch.cuda.is_available():
  dev = "cuda:0"
else:
  dev = "cpu"
device = torch.device(dev)

In [13]:
lambda_b       = 10.0
lambda_ic      =10.0

steps = 20
alpha = 0.1
log = 1

global loss_int_hist, loss_bc_hist, loss_ic_hist, pred_hist, xdim

N = 50
xdim = 3
pred_hist      = np.zeros(N)

model_shape = [xdim+1, 10, 10, 1]

model = KAN(width=model_shape, grid=5, k=3, grid_eps=1.0, noise_scale_base=0.25, device=device)
optimizer = LBFGS(model.parameters(), lr=1, history_size=10, line_search_fn="strong_wolfe", tolerance_grad=1e-32, tolerance_change=1e-32, tolerance_ys=1e-32)


In [14]:
def loss_fun(x_int, x_bc, model):
    mu = 1
    
    x_int = x_int.to(device)
    x_bc = x_bc.to(device)
    
    x, t = x_int[:, :-1], x_int[:, -1:]
    x = x.to(device)
    t = t.to(device)
    t.requires_grad_()
    x.requires_grad_()

    # Combine x and t to create input tensor for the model
    input_tensor = torch.cat((x, t), dim=1).to(device)
    u = model(input_tensor).to(device)

    def model_output(u):
        return u

    # Initialize tensors for derivatives
    du_dt = torch.zeros_like(u, device=device)
    du_dx = torch.zeros_like(x, device=device)
    d2u_dx2 = torch.zeros_like(x, device=device)

    # First-order time derivative
    du_dt = torch.autograd.grad(u, t, grad_outputs=torch.ones_like(t, device=device), create_graph=True)[0]
    
    # First-order spatial derivative
    du_dx = torch.autograd.grad(u, x, grad_outputs=torch.ones_like(u, device=device), retain_graph=True, allow_unused=True)[0]

    # Second-order spatial derivative    
    d2u_dx2 = torch.autograd.functional.jacobian(lambda x: du_dx.sum(), x)


    # Compute the residual R_int
    R_int = torch.mean((du_dt.squeeze(1) + torch.sum(d2u_dx2, dim=1) - mu * torch.sum(du_dx ** 2, dim=1)) ** 2)

    # Boundary condition handling
    x_bc, t_bc = x_bc[:, :-1], x_bc[:, -1:]
    x_bc = x_bc.to(device)
    t_bc = t_bc.to(device)
    
    t_bc.requires_grad_()
    input_tensor_bc = torch.cat((x_bc, t_bc), dim=1).to(device)
    u_bc = model(input_tensor_bc)

    R_bc = torch.mean(torch.square(u_bc - torch.log((1 + torch.norm(x_bc, p=2) ** 2) / 2)))

    return R_int, R_bc

In [15]:
def get_data():
    global T
    T = 1

    tensor1 = torch.randn((50, xdim), device=device)
    tensor2 = torch.rand((50, 1), device=device)*T
    x_int = torch.cat([tensor1, tensor2], dim=1)
    
    tensor1 = torch.randn((50, xdim), device=device)
    tensor2 = torch.ones((50, 1), device=device)*T
    x_bc = torch.cat([tensor1, tensor2], dim=1)
    
    return x_int, x_bc

In [16]:
def train(steps):

    loss_int_hist  = np.zeros(steps)
    loss_bc_hist    = np.zeros(steps)
    # loss_ic_hist    = np.zeros(steps)
    
    pbar = tqdm(range(steps), desc='description')
    # optimizer = torch.optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-7)

    for epoch in pbar:
        def closure():
            global loss_int, loss_bc, x_int
            # zero the gradient buffers
            optimizer.zero_grad()
            x_int, x_bc = get_data()
            x_int = x_int.to(device)
            x_bc = x_bc.to(device)

            # x_int = x_int.reshape(-1, 1)
            # x_bc = x_bc.reshape(-1, 1)
            
            # print(x_int.shape)
            # print(x_bc.shape)

            # compute losses
            loss_int, loss_bc = loss_fun(x_int, x_bc, model)
            loss = lambda_ic*loss_int + lambda_b*loss_bc

            # compute gradients of training loss
            loss.backward()
            
            return loss
        
        x_int, x_bc = get_data()
        # print(x_int.shape)
        # print(x_bc.shape)

#         if epoch % 5 == 0 and epoch < 50:
#             model.update_grid_from_samples(x_int)

        optimizer.step(closure)
        loss = loss_int + lambda_b*loss_bc

        if epoch % log == 0:
            pbar.set_description("interior pde loss: %.2e | bc loss: %.2e " % (loss_int.cpu().detach().numpy(), loss_bc.cpu().detach().numpy()))
        # print(f'   --- epoch {epoch+1}: loss_int = {loss_int.item():.4e}, loss_bc = {loss_bc.item():.4e}, loss_ic = {loss_ic.item():.4e}')
        
        # save loss
        loss_int_hist[epoch] = loss_int
        loss_bc_hist[epoch] = loss_bc

    return loss_int_hist, loss_bc_hist

In [17]:
# Measure execution time
start_time = time.time()

int_losses, bc_losses = train(steps)

end_time = time.time()
elapsed_time = end_time - start_time
print(f"Training completed in {elapsed_time:.2f} seconds.")

description:   0%|          | 0/20 [00:00<?, ?it/s]

interior pde loss: 3.39e-02 | bc loss: 8.06e-02 : 100%|██████████| 20/20 [00:42<00:00,  2.11s/it]

Training completed in 42.29 seconds.





In [18]:
def convert_seconds(seconds):
    minutes = seconds // 60
    remaining_seconds = seconds % 60
    return f"{minutes} mins {remaining_seconds} secs"

formatted_time = convert_seconds(elapsed_time)

data = {
    "Model Shape": model_shape,
    "Input Dimension": xdim,
    "Epochs/Steps": steps,
    "Final Interior PDE Loss": int_losses[-1],
    "Final Boundary Condition Loss": bc_losses[-1],
    "Runtime Duration": formatted_time
}

# Define the folder and file path
parent_folder = os.path.abspath(os.path.join(os.getcwd(), os.pardir))  # Get the parent directory
burgers_kan_folder = os.path.join(parent_folder, "results", "HJB_KAN")
data_folder = os.path.join(burgers_kan_folder, "data")
file_path = os.path.join(data_folder, "data.json")

# Create the folders if they don't exist
os.makedirs(data_folder, exist_ok=True)

# Initialize data_list
data_list = []

# Check if the JSON file exists and is not empty
if os.path.exists(file_path):
    try:
        with open(file_path, "r") as file:
            # Attempt to read the existing data
            data_list = json.load(file)
    except json.JSONDecodeError:
        # If there's an error, print a warning and continue with an empty list
        print("Warning: data.json is empty or corrupted. Starting with an empty list.")
        data_list = []

# Append the new data
data_list.append(data)

# Write the updated list of dictionaries back to the JSON file
with open(file_path, "w") as file:
    json.dump(data_list, file, indent=4)

In [19]:
# List of losses and their names for plotting
loss_lists = [int_losses, bc_losses]
loss_names = ["int_losses", "bc_losses"]

# Define the figure title
figure_title = "Training Losses"

# Create the subplots
fig, axs = plt.subplots(1, 2, figsize=(12, 6))
fig.suptitle(figure_title, fontsize=20)

# Plot each loss list
for i, (loss_list, loss_name) in enumerate(zip(loss_lists, loss_names)):
    axs[i].plot(loss_list, marker='o')
    axs[i].set_xlabel("epoch/step")
    axs[i].set_ylabel("loss value")
    axs[i].set_title(loss_name)
    axs[i].grid(True)

# Adjust layout to prevent overlap
fig.tight_layout(rect=[0, 0, 1, 0.95])  # Leave space for the main title

# Define the folder path in the parent directory
parent_folder = os.path.abspath(os.path.join(os.getcwd(), os.pardir))  # Get the parent directory
training_plots_folder = os.path.join(parent_folder, "results", "HJB_KAN", "training_plots")

# Create the training_plots folder if it doesn't exist
os.makedirs(training_plots_folder, exist_ok=True)

# Join the elements of the array with underscores to create the file name
file_name = "_".join(map(str, model_shape)) + f"_HJB_{xdim}_{steps}" + ".jpeg"

# Save the figure in the training_plots folder
save_path = os.path.join(training_plots_folder, file_name)
plt.savefig(save_path, format='jpeg')
plt.close(fig)  # Close the figure to free up memory