In [41]:
#!pip install nangs
#!pip install torchdiffeq

In [42]:
import numpy as np 
import scipy
import matplotlib.pyplot as plt 
import torch
import nangs
import math 
import sympy as sp

device = "cuda" if torch.cuda.is_available() else "cpu"
device = "cpu"
#nangs.__version__, torch.__version__
torch.tensor([0., 0.]).to(device)

tensor([0., 0.])

In [43]:
# setup duffing oscillator x_dot = f(x)

# compute linearization and eigenvalues
A = np.array([[0,1],[-1,-0.5]])
E,W = scipy.linalg.eig(np.array(A.T))
# Er,Wr = scipy.linalg.cdf2rdf(E, W) # realify in case

w1_real = W[:,0].real; w1_imag = W[:,0].imag
w2_real = W[:,1].real; w2_imag = W[:,1].imag

eig1_real = E.real[0]; eig2_real = E.real[1]
eig1_imag = E.imag[0]; eig2_imag = E.imag[1]

# convert to torch tensors
A = torch.tensor(A).reshape(2,2).to(device)
w1_real = torch.tensor(w1_real).to(device); w1_imag = torch.tensor(w1_imag).to(device)
eig1_real = torch.tensor(eig1_real).to(device); eig1_imag = torch.tensor(eig1_imag).to(device)

# padd extra dimension of dot products
w1_real = w1_real[:,None]; w1_imag = w1_imag[:,None]

In [44]:
# define x_dot = f(x)
# def f(t,x):
#     if not torch.is_tensor(x):
#         print('x is not a tensor')
#         print(x[1,:])
#         x = torch.tensor(x).cuda()
#     x1_dot = x[1]
#     x2_dot = -0.5*x[1] - (x[0]**2 - 1)*x[0]
#     return torch.stack((x1_dot,x2_dot))

# find the nonlinear part fn(x)
# def fn(t,x):
#     if not torch.is_tensor(x):
#         x = torch.tensor(x).cuda()
#     #return f(t,x) - torch.mm(A,x.double())
#     return f(t,x) - torch.mm(A,x)

# def g_real(t,x,w):
#     return torch.mm(w.T,fn(t,x))

# def g_imag(t,x,w):    
#     return torch.mm(w.T,fn(t,x))

In [45]:
def f(t,x,y):
    print(x)
    if not torch.is_tensor(x):
        print('x is not a tensor') 
        x = torch.tensor(x).to(device)
    x_dot = y
    y_dot = -0.5*y - (x**2 - 1)*x
    return torch.stack((x_dot,y_dot))

def fn(t,x,y):
    if not torch.is_tensor(x):
        x = torch.tensor(x).to(device)
    return torch.mm(A,torch.vstack((x.double(),y.double())))
    #return f(t,x,y) - torch.mm(A,torch.vstack((x,y)))

def g_real(t,x,y,w):
    return torch.mm(w.T,fn(t,x,y))

def g_imag(t,x,y,w):    
    return torch.mm(w.T,fn(t,x,y))

In [46]:
x0 = torch.tensor([[0],[0]]).to(device)
t = torch.tensor([0]).to(device)
# g_real(t,x0[0],x0[1],w1_real)
f(t,x0[1],x0[0])

tensor([0])


tensor([[0.],
        [-0.]])

In [47]:
#Define the eigenfunction PDE

from nangs import PDE

class Eigen(PDE):
    def computePDELoss(self, inputs, outputs):
                
        # compute gradients
        grads = self.computeGrads(outputs, inputs)
        
        # compute loss
        dpdx, dpdy = grads[:, 0], grads[:, 1]
        x, y = inputs[:, 0], inputs[:, 1]
        p = outputs
        t = torch.tensor([0]) # steady state values
        u, v = f(t, x, y)
        g = g_real(t, x, y, w1_real)
        eig = eig1_real
        return {'pde': -eig*p + u*dpdx + v*dpdy + g}

# instantiate pde
pde = Eigen(inputs=('x', 'y'), outputs='p')

In [48]:
x0 = torch.tensor([[0],[0]]).to(device)
t = torch.tensor([0]).to(device)
pde.inputs, pde.outputs

(('x', 'y'), ('p',))

# Prepare training data

In [49]:
# define the sampler

from nangs import RandomSampler

sampler = RandomSampler({
    'x': [-0.45, 0.45], 
    'y': [-0.45, 0.45],
}, device=device, n_samples=2000)
pde.set_sampler(sampler)

In [50]:
sampler.sample(5)

{'x': tensor([ 0.3308, -0.2341,  0.4295,  0.2360,  0.3085]),
 'y': tensor([-0.0090,  0.1402,  0.0316,  0.0184,  0.1239])}

Test stacking

In [51]:
# test stacking
# x0 = torch.tensor([[0],[0]], dtype=double).cuda()
# t = torch.tensor([0]).cuda()
# torch.vstack((f(t,x0), torch.exp(-eig1_real*t)*g_real(t,x0,w1_real)))

Test odeint

In [52]:
# test ode int
# from torchdiffeq import odeint
# H = lambda t,X : torch.vstack((f(t,torch.vstack((x0[0],x0[1]))), torch.exp(-eig1_real*t)*g_real(t,torch.vstack((x0[0],x0[1])),w1_real)))
# t_boundary = torch.tensor([0.]).cuda()

# sol = odeint(H, torch.vstack((x0[0],x0[1], torch.zeros(1).cuda())), t_boundary)
# sol[-1,2]

Test Initial conditions

In [53]:
# from torchdiffeq import odeint
# from nangs import Dirichlet

# H = lambda t,X : torch.vstack((f(t,torch.vstack((x0[0],x0[1]))), torch.exp(-eig1_real*t)*g_real(t,torch.vstack((x0[0],x0[1])),w1_real)))
# t_boundary = torch.tensor([0.,2.]).cuda()

# initial_condition1 = Dirichlet(
#     RandomSampler({'x': [-0.45, 0.45], 'y': 0.45}, device=device, n_samples=100), 
#     lambda inputs: {'p' : odeint(H, torch.vstack((inputs['x'].reshape(-1,1), inputs['y'].reshape(-1,1), torch.zeros(1).cuda())), t_boundary)[-1,2]},
#     name="test"
# )

# pde.add_boco(initial_condition1)

In [54]:
# initial_condition1.sample(5)


In [55]:
# from torchdiffeq import odeint
# from nangs import Dirichlet

# H = lambda t,X : torch.vstack((f(t,torch.vstack((x0[0],x0[1]))), torch.exp(-eig1_real*t)*g_real(t,torch.vstack((x0[0],x0[1])),w1_real)))
# t_boundary = torch.tensor([0.,2.]).cuda()

# initial_condition1 = Dirichlet(
#     RandomSampler({'x': [-0.45, 0.45], 'y': 0.45}, device=device, n_samples=100), 
#     lambda inputs: {'p' : odeint(H, torch.vstack((inputs['x'].reshape(-1,1), inputs['y'].reshape(-1,1), torch.zeros(1).cuda())), t_boundary)[-1,2]},
#     name="edge1"
# )

# initial_condition2 = Dirichlet(
#     RandomSampler({'x': [-0.45, 0.45], 'y': -0.45}, device=device, n_samples=100), 
#     lambda inputs: {'p' : odeint(H, torch.vstack((inputs['x'].reshape(-1,1), inputs['y'].reshape(-1,1), torch.zeros(1).cuda())), t_boundary)[-1,2]},
#     name="edge2"
# )

# initial_condition3 = Dirichlet(
#     RandomSampler({'x': 0.45, 'y': [-0.45, 0.45]}, device=device, n_samples=100), 
#     lambda inputs: {'p' : odeint(H, torch.vstack((inputs['x'].reshape(-1,1), inputs['y'].reshape(-1,1), torch.zeros(1).cuda())), t_boundary)[-1,2]},
#     name="edge3"
# )

# initial_condition4 = Dirichlet(
#     RandomSampler({'x': -0.45, 'y': [-0.45, 0.45]}, device=device, n_samples=100), 
#     lambda inputs: {'p' : odeint(H, torch.vstack((inputs['x'].reshape(-1,1), inputs['y'].reshape(-1,1), torch.zeros(1).cuda())), t_boundary)[-1,2]},
#     name="edge4"
# )

# pde.add_boco(initial_condition1)
# pde.add_boco(initial_condition2)
# pde.add_boco(initial_condition3)
# pde.add_boco(initial_condition4)

In [56]:
# initial_condition1.sample(7)

# Train

In [57]:
import torch.nn as nn

class Sine(nn.Module):
    def __init__(self):
        super().__init__()
    def forward(self, x):
        return torch.sin(x)
        

mlp = nn.Sequential(
	nn.Linear(2, 100),
	Sine(), 
	nn.Linear(100, 100), 
	Sine(), 
	nn.Linear(100, 1)
)

In [58]:
# solve

from nangs import MLP

LR = 1e-2
N_STEPS = 2000
NUM_LAYERS = 3
NUM_HIDDEN = 128

mlp = MLP(len(pde.inputs), len(pde.outputs), NUM_LAYERS, NUM_HIDDEN).to(device)
optimizer = torch.optim.Adam(mlp.parameters())
scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=LR, pct_start=0.1, div_factor=10, final_div_factor=1, total_steps=N_STEPS)


In [59]:
mlp

MLP(
  (mlp): Sequential(
    (0): Linear(in_features=2, out_features=128, bias=True)
    (1): Sequential(
      (0): Sine()
      (1): Linear(in_features=128, out_features=128, bias=True)
    )
    (2): Sequential(
      (0): Sine()
      (1): Linear(in_features=128, out_features=128, bias=True)
    )
    (3): Sequential(
      (0): Sine()
      (1): Linear(in_features=128, out_features=1, bias=True)
    )
  )
)

In [60]:
pde.compile(mlp, optimizer, scheduler)
%time hist = pde.solve(N_STEPS)

  0%|          | 0/2000 [00:00<?, ?it/s]

tensor([-0.0371,  0.1421,  0.0749,  ...,  0.2816,  0.2369, -0.4089],
       grad_fn=<SelectBackward0>)





RuntimeError: Found dtype Float but expected Double

In [None]:
# plot loss history
import pandas as pd 

df = pd.DataFrame(hist)
fig = plt.figure(dpi=100)
ax = plt.subplot(1,1,1)
ax.set_yscale('log')
df.plot(ax=ax, grid=True)

#**Evaluate**

In [None]:
from mpl_toolkits.mplot3d import Axes3D
import matplotlib.pyplot as plt
from matplotlib import cm

x = np.linspace(-0.45,0.45,100)
y = np.linspace(-0.45,0.45,100)

_x, _y = np.meshgrid(x, y)

grid = np.stack(np.meshgrid(x, y), -1).reshape(-1, 2)
X = torch.from_numpy(grid).float().to(device)
p = pde.eval(X)
p = p.cpu().numpy()

In [None]:
X.requires_grad_(True)
p = mlp(X)
grads = pde.computeGrads(p,X)
dpdx, dpdy = grads[:, 0].detach().numpy().reshape((len(_y),len(_x))), grads[:, 1].detach().numpy().reshape((len(_y),len(_x)))
p = p.detach().numpy().reshape((len(_y),len(_x)))
u,v = f(_x,_y)
g = g1(_x,_y)
error = -lam*p + u*dpdx.reshape((len(_y),len(_x))) + v*dpdy.reshape((len(_y),len(_x))) + g

# Visualize

In [None]:
import plotly.graph_objects as go
import pandas as pd
import numpy as np

"""
# Matplotlib not pretty 
fig = plt.figure(figsize=(15,5))
ax = fig.add_subplot(1, 2, 1, projection='3d')
ax.plot_surface(_x, _y, p + w1[0]*_x + w1[1]*_y, cmap=cm.coolwarm, linewidth=0, antialiased=False)
ax.plot_surface(_x, _y, -(w1[0]/2)*p2(_x,_y), cmap=cm.spring, linewidth=0, antialiased=False)
plt.show()
"""

fig = go.Figure(data=[go.Surface(z=(-w1[0]/2)*(2*_x**3+3*_y**3), x=_x, y=_y, showscale=False, opacity=0.3),
                      go.Surface(z=p, x=_x, y=_y) ])
fig.update_layout(title='$Nonlinear \,part:\; 2x^3 + 3y^3$', autosize=False,
                  width=600, height=500,
                  margin=dict(l=15, r=20, b=15, t=30))

fig.update_layout(scene_aspectmode='cube')
fig.show()


In [None]:
# Save if all looks A-OK !
from google.colab import files
torch.save(mlp.state_dict(),'checkpoint.pth')
files.download('checkpoint.pth')