In [39]:
import numpy as np
import torch
import torch.nn as nn
import neuralop
import matplotlib.pyplot as plt
import symengine as se
import pandas as pd
from tqdm.auto import tqdm
from copy import deepcopy

In [2]:
def random_transform(f, x):
    return f((50*float(np.random.random() - 0.5)) * x + (float(np.random.random() - 0.5))) + (float(np.random.random() - 0.5))

In [3]:
def random_polynom(x):
    degree = np.random.randint(1, 5)
    expression = (float(np.random.random() - 0.5)) * x**degree
    degree -= 1
    while degree >= 0:
        expression += 0.001*(float(np.random.random() - 0.5)) * x**degree
        degree -= 1
    return expression

In [353]:
def random_log(x):
    return (float(np.random.random() - 0.5)) * se.log(x + 1 + (float(np.random.random())))
    
def random_sqrt(x):
    return (float(np.random.random() - 0.5)) * se.sqrt(x + 1 + (float(np.random.random())))

In [354]:
def random_sum(l):
    res = []
    for f1 in l:
        res.append((float(np.random.random() - 0.5)) * f1 + (float(np.random.random() - 0.5)) * np.random.choice(l))
    return res

In [355]:
def random_product(l):
    res = []
    for f1 in l:
        res.append((float(np.random.random() - 0.5)) * f1 * (float(np.random.random() - 0.5)) * np.random.choice(l))
    return res

In [396]:
def random_function(x, num_points, length):
    maximum = 10
    minimum = -10
    while maximum >= 10 or minimum <= -10 or np.abs(np.max(f_eval)-np.min(f_eval)) <= 10e-2:
        bases = np.random.choice([random_transform(se.sin, x),
                                  random_transform(se.cos, x),
                                  random_transform(se.sinh, x),
                                  random_transform(se.cosh, x),
                                  random_log(x),
                                  random_sqrt(x),
                                  random_polynom(x)], size=np.random.randint(1, 5))
        f = bases[0]
        for i in range(1, len(bases)):
            if np.random.random() < 0.5:
                f = random_sum([f, bases[i]])[0]
            else:
                f = random_product([f, bases[i]])[0]
        df = se.diff(f, x)
        start = np.random.random()-1
        end = start + length
        points = np.sort(np.random.uniform(start, end, size=num_points))
        f_eval = np.array([float(f.subs({x: p})) for p in points]) + np.random.normal(0, 1e-2, size=num_points)
        df_eval = np.array([float(df.subs({x: p})) for p in points])
        maximum = np.max([np.max(f_eval), np.max(df_eval)])
        minimum = np.min([np.min(f_eval), np.min(df_eval)])
    return f, df, points, f_eval, df_eval

In [398]:
total_size = int(2**14)
length = 3
num_points = 512

In [399]:
GENERATING = True

if GENERATING:
    print("Generating", total_size, "samples")
    x = se.Symbol('x')

    functions, derivatives, points, functions_eval, derivatives_eval = [], [], [], [], []
    for _ in tqdm(range(total_size)):
        f, df, p, f_e, df_e = random_function(x, num_points, length)
        functions.append(f)
        derivatives.append(df)
        points.append(p)
        functions_eval.append(f_e)
        derivatives_eval.append(df_e)
    X = np.array(list(zip(functions_eval, points)))
    Y = np.array(derivatives_eval)
    np.save("X.npy", X)
    np.save("Y.npy", Y)
else:
    X, Y = np.load("X.npy"), np.load("Y.npy")
    total_size, _, num_points = X.shape

Generating 16384 samples


  0%|          | 0/16384 [00:00<?, ?it/s]

In [400]:
train_size = int(X.shape[0]*0.5)
val_size = int(X.shape[0]*0.25)
test_size = X.shape[0] - train_size - val_size

In [401]:
x_train, y_train = torch.tensor(X[:train_size], dtype=torch.float32), torch.tensor(Y[:train_size], dtype=torch.float32)
x_val, y_val = torch.tensor(X[train_size:train_size+val_size], dtype=torch.float32), torch.tensor(Y[train_size:train_size+val_size], dtype=torch.float32)
x_test, y_test = torch.tensor(X[train_size+val_size:], dtype=torch.float32), torch.tensor(Y[train_size+val_size:], dtype=torch.float32)

In [402]:
maxi_0 = x_train[:, 0].max()
mini_0 = x_train[:, 0].min()
maxi_1 = x_train[:, 1].max()
mini_1 = x_train[:, 1].min()

x_train[:, 0] = (x_train[:, 0]-mini_0)/(maxi_0-mini_0)
x_train[:, 1] = (x_train[:, 1]-mini_1)/(maxi_1-mini_1)

x_val[:, 0] = (x_val[:, 0]-mini_0)/(maxi_0-mini_0)
x_val[:, 1] = (x_val[:, 1]-mini_1)/(maxi_1-mini_1)

x_test[:, 0] = (x_test[:, 0]-mini_0)/(maxi_0-mini_0)
x_test[:, 1] = (x_test[:, 1]-mini_1)/(maxi_1-mini_1)

In [403]:
max_y, min_y = torch.max(y_train), torch.min(y_train)
y_train = (y_train-min_y)/(max_y-min_y)
y_val = (y_val-min_y)/(max_y-min_y)
y_test = (y_test-min_y)/(max_y-min_y)

In [404]:
n_epochs = 128
batch_size = 64

In [405]:
class Model(nn.Module):
    def __init__(self, shape):
        super(Model, self).__init__()
        self.l1 = nn.Linear(shape*2, 512)
        self.l2 = nn.Linear(512, 256)
        self.l3 = nn.Linear(256, 512)
        self.l4 = nn.Linear(512, shape)
        
        self.relu = nn.ReLU()

    def forward(self, inputs):
        x = self.relu(self.l1(inputs.reshape((inputs.shape[0], -1))))
        x = self.relu(self.l2(x))
        x = self.relu(self.l3(x))
        x = self.l4(x)
        return x

torch.manual_seed(42)
model1 = Model(num_points).to('mps')
best_model1 = deepcopy(model1)
print("Number of parameters:", np.sum([len(i) for i in model1.parameters()]))

Number of parameters: 3584


In [406]:
criterion = nn.MSELoss()
optimizer = torch.optim.AdamW(model1.parameters(), lr=0.0005)
losses_nn = []
for epoch in (pbar:=tqdm(range(n_epochs))):
    model1.train()
    optimizer.zero_grad()
    for b in range(0, x_train.shape[0], batch_size):
        predictions = model1(x_train[b: b+batch_size].to('mps')).to('cpu')
        loss = criterion(predictions, y_train[b: b+batch_size])
        loss.backward()
        optimizer.step()
    model1.eval()
    losses_nn.append(criterion(model1(x_val.to('mps')).to('cpu'), y_val).item())
    pbar.set_description(f"{losses_nn[-1]}")
    if losses_nn[-1] == min(losses_nn):
        best_model1 = deepcopy(model1)

  0%|          | 0/128 [00:00<?, ?it/s]

In [407]:
print("Final test loss:", criterion(model1(x_test.to('mps')).to('cpu'), y_test).item())
print("Best test loss:", criterion(best_model1(x_test.to('mps')).to('cpu'), y_test).item())

Final test loss: 0.010781707242131233
Best test loss: 0.010394170880317688


In [418]:
class Model2(nn.Module):
    def __init__(self):
        super(Model2, self).__init__()
        self.fc = nn.Sequential(
            neuralop.models.FNO1d(n_modes_height=128, n_layers=8, in_channels=2, out_channels=1, hidden_channels=8),
            #neuralop.models.UNO(in_channels=2, out_channels=1, hidden_channels=4, uno_out_channels=[4,4,4,4], uno_n_modes=[[64],[64],[64],[64]], 
            #                    uno_scalings=[[1],[1],[1],[1]])
        )

    def forward(self, x):
        x = x.squeeze(1)
        return self.fc(x)

torch.manual_seed(42)
model2 = Model2().to('mps')
best_model2 = deepcopy(model2)
print("Number of parameters:", np.sum([len(i) for i in model2.parameters()]))

Number of parameters: 1178


In [None]:
criterion = nn.MSELoss()
optimizer = torch.optim.AdamW(model2.parameters(), lr=0.0005)
losses = []
for epoch in (pbar:=tqdm(range(n_epochs))):
    model2.train()
    optimizer.zero_grad()
    for b in range(0, x_train.shape[0], batch_size):
        predictions = model2(x_train[b: b+batch_size].unsqueeze(1).to('mps')).to('cpu')
        loss = criterion(predictions.squeeze(), y_train[b: b+batch_size])
        loss.backward()
        optimizer.step()
    model2.eval()
    losses.append(criterion(model2(x_test.unsqueeze(1).to('mps')).to('cpu').squeeze(), y_test).item())
    pbar.set_description(f"{losses[-1]}")
    if losses[-1] == min(losses):
        best_model2 = deepcopy(model2)

  0%|          | 0/128 [00:00<?, ?it/s]

In [None]:
print("Final test loss:", criterion(model2.to('cpu')(x_test).squeeze(), y_test).item())
print("Best test loss:", criterion(best_model2.to('cpu')(x_test).squeeze(), y_test).item())

In [None]:
model1.eval()
model2.eval()
r = 4
for i in [r, r+1]:
    with torch.no_grad():
        y_hat_nn, y_hat_no = best_model1(x_test[i:i+1].to('mps')).to('cpu').squeeze(), best_model2(x_test[i:i+1]).squeeze()
    y_hat_np = np.diff((x_test[i, 0]+mini_0)*(maxi_0-mini_0))/(length/num_points)
    y_hat_np = (y_hat_np-min_y.item())/(max_y.item()-min_y.item())
    fig, ax = plt.subplots(1, 3, figsize=(18, 4))
    ax[0].plot(x_test[i, 1], y_test[i], linestyle="--", label='truth')
    ax[1].plot(x_test[i, 1], y_test[i], linestyle="--", label='truth')
    ax[2].plot(x_test[i, 1], y_test[i], linestyle="--", label='truth')
    ax[0].plot(x_test[i, 1], y_hat_nn, label='nn')
    ax[1].plot(x_test[i, 1], y_hat_no, label='no')
    ax[2].plot(x_test[i, 1, 1:], y_hat_np, label='np')
    ax[0].grid('on'), ax[1].grid('on'), ax[2].grid('on')
    ax[0].set_title('Neural Network'), ax[1].set_title('Neural Operator'), ax[2].set_title('Numerical differentiation')
    ax[0].legend(), ax[1].legend(), ax[2].legend()
    plt.show()
    i += 1

In [None]:
plt.figure(figsize=(15, 8))
plt.plot(losses_nn, label='nn')
plt.plot(losses, label='no')
plt.title(f"Neural Network ({sum([sum(p.shape) for p in model1.parameters()])} parameters): {np.mean(losses_nn[-20:]) : .7f}\n" +
          f"Neural Operator ({sum([sum(p.shape) for p in model2.parameters()])} parameters): {np.mean(losses[-20:]) : .7f}")
plt.yscale('log')
plt.grid('on')
plt.legend()
plt.show()