In [None]:
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt

import torch
from collections import OrderedDict
import numpy as np
import matplotlib.pyplot as plt
import scipy.io
from scipy.interpolate import griddata
from mpl_toolkits.axes_grid1 import make_axes_locatable
import matplotlib.gridspec as gridspec
import warnings
from scipy.integrate import odeint
from matplotlib import style
import numpy as np

warnings.filterwarnings('ignore')

np.random.seed(1234)

# CUDA support
if torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

print(f"Device is {device}")
"""## Physics-informed Neural Networks"""


# vectorize_time_dep_param = np.vectorize(time_dep_param)




def logistic_growth(N, t, r, K):
    return r * N * (1 - N/K)


# Step 1. Generate Data
print("Step 1. Generate Data")

max_t = 30
step_size = 1.0
noise = 0.03

t = np.arange(0, max_t, step_size) #0.1
p_ = (0.3, 1.2)

ic = 0.1
# X is ideal data
X = odeint(logistic_growth, ic, t, args=p_)

# Add noise according to the mean
xbar = np.mean(X, axis=0)
Xn = X + noise * xbar * np.random.randn(*X.shape)

    # Step 2. Setup Training
print("Step 2. Set Up Training")

t_train = np.expand_dims(t.copy(),axis=1)
t_test = np.arange(np.min(t_train),np.max(t_train)+(t_train[1]-t_train[0])/2,(t_train[1]-t_train[0])/2)
t_colloc = np.expand_dims(np.arange(0, max_t, 0.001),axis=1)
y_train = Xn.copy() # changed, used to be X

# plot the data for sanity check
plt.grid()
plt.title("Trajectories")
plt.plot(t_train, y_train[:, 0], 'o',label='cell count',
            color='k')
plt.legend()
plt.savefig('data')
plt.clf()

# Normalize inputs and outputs to be in [0, 1]
tm, tM = t_train.min(), t_train.max()
ym, yM = y_train.min(axis=0), y_train.max(axis=0)

# the deep neural network
class DNN(torch.nn.Module):
    def __init__(self, layers, min_val, max_val):
        super(DNN, self).__init__()

        # parameters
        self.depth = len(layers) - 1

        self.min_val = torch.tensor([min_val], requires_grad=True).float().to(device)
        self.max_val = torch.tensor([max_val], requires_grad=True).float().to(device)

        # set up layer order dict
        self.activation = torch.nn.Tanh

        layer_list = list()
        for i in range(self.depth - 1):
            layer_list.append(
                ('layer_%d' % i, torch.nn.Linear(layers[i], layers[i+1]))
            )
            layer_list.append(('activation_%d' % i, self.activation()))

        layer_list.append(
            ('layer_%d' % (self.depth - 1), torch.nn.Linear(layers[-2], layers[-1]))
        )
        layerDict = OrderedDict(layer_list)
        print(layerDict)
        # deploy layers
        self.layers = torch.nn.Sequential(layerDict)

    def forward(self, x):
        res = (x - self.min_val) / (self.max_val - self.min_val)
        out = self.layers(res)
        return out

max_lr = -1
min_lr = -3

time_delta = 30000
warm_ups = 1000
rate = (min_lr - max_lr) / time_delta

def lr_schedule(epoch):
    """Linear annealing LR schedule."""
    if epoch < warm_ups:
        return 10**(max_lr)
    elif epoch < time_delta:
        return 10**(rate * (epoch - warm_ups) + max_lr)
    else:
        return 10**min_lr

# the physics-guided neural network
class PhysicsInformedNN():
    def __init__(self, t, t_colloc, u_data, p_, layers, lb, ub):

        # # boundary conditions
        # self.lb = torch.tensor(lb).float().to(device)
        # self.ub = torch.tensor(ub).float().to(device)

        # data
        self.x = torch.tensor(X[:, 0:1], requires_grad=True).float().to(device)

        self.t = torch.tensor(t, requires_grad=True).float().to(device)
        self.t_colloc = torch.tensor(t_colloc, requires_grad=True).float().to(device)
        self.u_data = torch.tensor(u_data, requires_grad=True).float().to(device)

        r, K = p_
        self.r = torch.tensor([r], requires_grad=True).float().to(device) # parameters
        self.K = torch.tensor([K], requires_grad=True).float().to(device)

        self.r = torch.nn.Parameter(self.r)
        self.K = torch.nn.Parameter(self.K)

        # settings
        #self.lambda_1 = torch.tensor([0.0], requires_grad=True).to(device)
        #self.lambda_2 = torch.tensor([0.001*np.pi], requires_grad=True).to(device)

        # deep neural networks
        print(f"layers: {layers}")
        #print(f"tm: {tm}")
        #print(f"tM: {tM}")
        self.dnn = DNN(layers,tm,tM).to(device)
        self.dnn.register_parameter('r', self.r)
        self.dnn.register_parameter('K', self.K)

         # optimizers: using the same settings
        self.optimizer = torch.optim.LBFGS(
            self.dnn.parameters(),#self.dnn.parameters(),
            lr=1.0,
            max_iter=50000,
            max_eval=50000,
            history_size=50,
            tolerance_grad=0,#1e-8,
            tolerance_change=0,#1.0 * np.finfo(float).eps,
            line_search_fn="strong_wolfe"       # can be "strong_wolfe"
        )

        self.optimizer_Adam = torch.optim.Adam(self.dnn.parameters())
        self.scheduler = torch.optim.lr_scheduler.LambdaLR(self.optimizer_Adam, lr_lambda=lr_schedule)
        self.iter = 0

    def net_u(self, t):
        u = self.dnn(t)
        return u

    def net_f(self, t, t_colloc, u_data):
        """ The pytorch autograd version of calculating residual """
        # u = self.net_u(t)
        u_colloc = self.net_u(t_colloc)

        # loss_mse = torch.mean(torch.square(u - u_data))

        _x = torch.unsqueeze(u_colloc[:, 0],axis=1)

        x_t = torch.autograd.grad(
            _x, t_colloc,
            grad_outputs=torch.ones_like(_x),
            retain_graph=True,
            create_graph=True,
        )[0]

        x_dot = self.r * _x * (1 - _x/self.K)
        loss_pinn = torch.mean(torch.square(x_t - x_dot))

        return loss_pinn

    def loss_func(self):
        self.optimizer.zero_grad()

        u_pred = self.net_u(self.t)
        pinn_loss = self.net_f(self.t, self.t_colloc, self.u_data)
        data_loss = torch.mean((self.u_data - u_pred) ** 2)
        loss = data_loss + pinn_loss

        loss.backward()

        self.iter += 1
        if self.iter % 100 == 0:
            print(
                'Loss: %e, data_loss: %e, pinn_loss: %e, r: %.3f, K: %.6f' %
                (
                    loss.item(),
                    data_loss.item(),
                    pinn_loss.item(),
                    self.r.item(),
                    self.K.item()
                )
            )
        return loss

    def train(self, adam_epochs, bfgs_epochs, polish_adam_epochs):
        self.dnn.train()

        for epoch in range(adam_epochs):
            # print(f"self.t shape: {self.t.shape}")
            u_pred = self.net_u(self.t)
            # print(f"u_pred shape: {u_pred.shape}")
            # print(f"u_data shape: {self.u_data.shape}")
            # print(f"self.u_data - u_pred shape: {(self.u_data - u_pred).shape}")
            loss_pinn = self.net_f(self.t, self.t_colloc, self.u_data)
            loss_mse = torch.mean(torch.square(self.u_data - u_pred))
            #print(f"loss_mse: {torch.mean(torch.square(self.u_data - u_pred))}")
            loss = loss_mse + loss_pinn #torch.mean(f_pred ** 2)
            # print(f"loss shape: {loss.shape}")

            # Backward and optimize
            self.optimizer_Adam.zero_grad()
            loss.backward()
            self.optimizer_Adam.step()
            self.scheduler.step()

            if epoch % 100 == 0:
                print(
                    'It: %d, Loss: %.3e , loss_mse: %.3e,  r: %.3f, K: %.6f' %
                    (
                        epoch,
                        loss.item(),
                        loss_mse.item(),
                        self.r.item(),
                        self.K.item()
                    )
                )
        self.optimizer.step(self.loss_func)

    def predict(self, t):
        #x = torch.tensor(X[:, 0:1], requires_grad=True).float().to(device)
        t = torch.tensor(t, requires_grad=True).float().to(device)

        self.dnn.eval()
        u = self.net_u(t)
        # f = self.F_net(t)
        u = u.detach().cpu().numpy()
        # f = f.detach().cpu().numpy()
        return u

"""## Configurations"""

N_u = 2000
layers = [1, 20, 20, 20, 20, 20, 20, 20, 20, 1]

"""## Training on Non-noisy Data"""

# Commented out IPython magic to ensure Python compatibility.
# %%time
#

#
#
adam_epochs = 5000
polish_adam_epochs = 2000
bfgs_epochs = 2000

# training
model = PhysicsInformedNN(t_train, t_colloc, y_train, (0.5,0.5), layers, 0, max_t)
model.train(adam_epochs, bfgs_epochs, polish_adam_epochs)

# evaluations
u_pred= model.predict(model.t)

plt.title("Trajectories")
plt.plot(t_train, y_train[:, 0], 'o',label='Measurements',
          color='k')
plt.plot(t_train, u_pred[:, 0],'-', label='PINN approximation', color='r')
plt.savefig('trajectories')
plt.legend()
plt.clf()

Device is cuda
Step 1. Generate Data
Step 2. Set Up Training
layers: [1, 20, 20, 20, 20, 20, 20, 20, 20, 1]
OrderedDict({'layer_0': Linear(in_features=1, out_features=20, bias=True), 'activation_0': Tanh(), 'layer_1': Linear(in_features=20, out_features=20, bias=True), 'activation_1': Tanh(), 'layer_2': Linear(in_features=20, out_features=20, bias=True), 'activation_2': Tanh(), 'layer_3': Linear(in_features=20, out_features=20, bias=True), 'activation_3': Tanh(), 'layer_4': Linear(in_features=20, out_features=20, bias=True), 'activation_4': Tanh(), 'layer_5': Linear(in_features=20, out_features=20, bias=True), 'activation_5': Tanh(), 'layer_6': Linear(in_features=20, out_features=20, bias=True), 'activation_6': Tanh(), 'layer_7': Linear(in_features=20, out_features=20, bias=True), 'activation_7': Tanh(), 'layer_8': Linear(in_features=20, out_features=1, bias=True)})
It: 0, Loss: 7.173e-01 , loss_mse: 7.157e-01,  r: 0.500, K: 0.499900
It: 100, Loss: 3.733e-01 , loss_mse: 3.716e-01,  r: 

<Figure size 640x480 with 0 Axes>

In [None]:
model.K

Parameter containing:
tensor([1.2026], device='cuda:0', requires_grad=True)