### Implementing RNN

In [None]:
import torch
import torch.nn as nn
import time
import numpy as np
import copy
import matplotlib.pyplot as plt
from matplotlib import cm
from scipy.integrate import solve_ivp
#!pip install tikzplotlib #uncomment for saving nice images
#import tikzplotlib
# set precision
torch.set_default_dtype(torch.float64)
print('Default data type:', torch.get_default_dtype())

Default data type: torch.float64


Constructing ResNet

In [None]:
def antiderivTanh(x): # activation function aka the antiderivative of tanh
    return torch.abs(x) + torch.log(1+torch.exp(-2.0*torch.abs(x)))


class ResNN(nn.Module):
    def __init__(self, din, m, dout, nTh=2):
        """
            ResNet N portion of Phi
        :param d:   int, dimension of space input (expect inputs to be d+1 for space-time)
        :param m:   int, hidden dimension
        :param nTh: int, number of resNet layers , (number of theta layers)
        """
        super().__init__()

        if nTh < 2:
            print("nTh must be an integer >= 2")
            exit(1)

        self.din = din
        self.dout = dout
        self.m = m
        self.nTh = nTh
        self.layers = nn.ModuleList([])
        self.layers.append(nn.Linear(din, m, bias=True)) # opening layer
        self.layers.append(nn.Linear(m,m, bias=True)) # resnet layers
        for i in range(nTh-2):
            self.layers.append(copy.deepcopy(self.layers[1]))
        self.layers.append(nn.Linear(m, dout)) # output layer
        self.act = antiderivTanh
        self.h = 1.0 / (self.nTh-1) # step size for the ResNet

    def forward(self, x):
        """
            N(s;theta). the forward propogation of the ResNet
        :param x: tensor nex-by-d+1, inputs
        :return:  tensor nex-by-1,   outputs
        """

        x = self.act(self.layers[0].forward(x))

        for i in range(1,self.nTh):
            x = x + self.h * self.act(self.layers[i](x))
        x = self.layers[-1](x)
        return x

In [None]:
class ResNNrk4(nn.Module):
    def __init__(self, din, m, dout, nTh=2):
        """
            ResNet N portion of Phi
        :param d:   int, dimension of space input (expect inputs to be d+1 for space-time)
        :param m:   int, hidden dimension
        :param nTh: int, number of resNet layers , (number of theta layers)
        """
        super().__init__()

        if nTh < 2:
            print("nTh must be an integer >= 2")
            exit(1)

        self.din = din
        self.dout = dout
        self.m = m
        self.nTh = nTh
        self.layers = nn.ModuleList([])
        self.layers.append(nn.Linear(din, m, bias=True)) # opening layer
        self.layers.append(nn.Linear(m,m, bias=True)) # resnet layers
        for i in range(nTh-2):
            self.layers.append(copy.deepcopy(self.layers[1]))
        self.layers.append(nn.Linear(m, dout)) # output layer
        self.act = antiderivTanh
        self.h = 1.0 / (self.nTh-1) # step size for the ResNet

    def forward(self, x):
        """
            N(s;theta). the forward propogation of the ResNet
        :param x: tensor nex-by-d+1, inputs
        :return:  tensor nex-by-1,   outputs
        """
  
        x = self.act(self.layers[0].forward(x))

        for i in range(1,self.nTh):
            k1= self.h / 6 * self.act(self.layers[i].forward(x))
            k2= self.h / 3 * self.act(self.layers[i].forward(x+self.h*k1/2))
            k3= self.h / 3 * self.act(self.layers[i].forward(x+self.h*k2/2))
            k4= self.h / 6 * self.act(self.layers[i].forward(x+self.h*k3))
            x = x + self.h * (k1+k2+k3+k4)
        x = self.layers[-1](x)

        return x

In [None]:
class HINNrk4(nn.Module):
    def __init__(self, din, m, dout, nTh=2):
        """
            ResNet N portion of Phi
        :param d:   int, dimension of space input (expect inputs to be d+1 for space-time)
        :param m:   int, hidden dimension
        :param nTh: int, number of resNet layers , (number of theta layers)
        """
        super().__init__()

        if nTh < 2:
            print("nTh must be an integer >= 2")
            exit(1)

        self.din = din
        self.dout = dout
        self.m = m
        self.nTh = nTh
        self.layers = nn.ModuleList([])
        self.layers.append(nn.Linear(din, m, bias=True)) # opening layer
        self.layers.append(nn.Linear(m,m, bias=True)) # resnet layers
        for i in range(nTh-2): #middle hidden layers
            self.layers.append(copy.deepcopy(self.layers[1]))
        self.layers.append(nn.Linear(m, dout)) # output layer
        self.act = antiderivTanh
        self.h = 1.0 / (self.nTh-1) # step size for the ResNet

    def forward(self, x):
        """
            N(s;theta). the forward propogation of the ResNet
        :param x: tensor nex-by-d+1, inputs
        :return:  tensor nex-by-1,   outputs
        """

        x = self.act(self.layers[0].forward(x))
        y = x
        z = 0 * y


        for i in range(1,self.nTh):
          #hamiltonian discritized equations
            k1_1= self.h  * self.act(self.layers[i].forward(y))
            k2_1= self.h  * self.act(self.layers[i].forward(y+self.h*k1_1/2))
            k3_1= self.h  * self.act(self.layers[i].forward(y+self.h*k2_1/2))
            k4_1= self.h  * self.act(self.layers[i].forward(y+self.h*k3_1))
            y = y + 1/6 * self.h * (k1_1+2*k2_1+2*k3_1+k4_1)

            k1_2= self.h  * self.act(self.layers[i].forward(z))
            k2_2= self.h  * self.act(self.layers[i].forward(z+self.h*k1_1/2))
            k3_2= self.h  * self.act(self.layers[i].forward(z+self.h*k2_1/2))
            k4_2= self.h  * self.act(self.layers[i].forward(z+self.h*k3_1))

            z = z + 1/6 * self.h * (k1_1+2*k2_1+2*k3_1+k4_1)

        y =  self.layers[-1](y)
        z =  self.layers[-1](z)

        # print("y size:")
        # print(y.size())
        # print("z size:")
        # print(z.size())


        x = torch.cat((y,z),dim = 1)
        # print(x.size())
        # x = self.layers[-1](x) #error occurs here

        return x

In [None]:
class HINNVerlet(nn.Module):
    def __init__(self, din, m, dout, nTh=2):
        """
            ResNet N portion of Phi
        :param d:   int, dimension of space input (expect inputs to be d+1 for space-time)
        :param m:   int, hidden dimension
        :param nTh: int, number of resNet layers , (number of theta layers)
        """
        super().__init__()

        if nTh < 2:
            print("nTh must be an integer >= 2")
            exit(1)

        self.din = din
        self.dout = dout
        self.m = m
        self.nTh = nTh
        self.layers = nn.ModuleList([])
        self.layers.append(nn.Linear(din, m, bias=True)) # opening layer
        self.layers.append(nn.Linear(m,m, bias=True)) # resnet layers
        for i in range(nTh-2): #middle hidden layers
            self.layers.append(copy.deepcopy(self.layers[1]))
        self.layers.append(nn.Linear(m, dout)) # output layer
        self.act = antiderivTanh
        self.h = 1.0 / (self.nTh-1) # step size for the ResNet

    def forward(self, x):
        """
            N(s;theta). the forward propogation of the ResNet
        :param x: tensor nex-by-d+1, inputs
        :return:  tensor nex-by-1,   outputs
        """

        x = self.act(self.layers[0].forward(x))
        y = x
        z = 0 * y


        for i in range(1,self.nTh):
          #hamiltonian discritized equations
          z = z - self.h*self.act(self.layers[i].forward(y)) #transpose?
          y = y + self.h*self.act(self.layers[i].forward(z))

        y =  self.layers[-1](y)
        z =  self.layers[-1](z)
        
        # print("y size:")
        # print(y.size())
        # print("z size:")
        # print(z.size())


        x = torch.cat((y,z),dim = 1)
        # print(x.size())
        # x = self.layers[-1](x) #error occurs here

        return x

Headers for printing

In [None]:
def print_headers( verbose: bool = True):
    r"""
    Print headers for nice training
    """
    loss_printouts = ('loss',)
    n_loss = len(loss_printouts)

    headers = (('', '', '|', 'running',) + (n_loss - 1) * ('',) + ('|', 'train',)
               + (n_loss - 1) * ('',) + ('|', 'valid',) + (n_loss - 1) * ('',))

    printouts = ('epoch', 'time') + 3 * (('|',) + loss_printouts)
    printouts_frmt = '{:<15d}{:<15.4f}' + 3 * ('{:<2s}' + n_loss * '{:<15.4e}')

    if verbose:
        print(('{:<15s}{:<15s}' + 3 * ('{:<2s}' + n_loss * '{:<15s}')).format(*headers))
        print(('{:<15s}{:<15s}' + 3 * ('{:<2s}' + n_loss * '{:<15s}')).format(*printouts))

    return headers, printouts, printouts_frmt

In [None]:
print_headers()

                              | running        | train          | valid          
epoch          time           | loss           | loss           | loss           


(('', '', '|', 'running', '|', 'train', '|', 'valid'),
 ('epoch', 'time', '|', 'loss', '|', 'loss', '|', 'loss'),
 '{:<15d}{:<15.4f}{:<2s}{:<15.4e}{:<2s}{:<15.4e}{:<2s}{:<15.4e}')

Create a training, validation, and test set for Peaks.

In [None]:
n_train = 4000      # number of training points
n_val = 300         # number of validation points
n_test = 300        # number of testing points
#import data and assign each array to a different value
import numpy as np
from pathlib import Path
odeDataPath = Path('sho_data.npz') #path to file
odeData = np.load(odeDataPath)
yData = np.reshape(odeData['y'],[-1,1]) #reshape the data so its [datapoints x 1] tensor
zData = np.reshape(odeData['z'],[-1,1])
tData = np.reshape(odeData['t'],[-1,1])

# assign data to x and y
x = torch.tensor(tData)
y = np.concatenate([yData, zData],axis=1) #combine the y and z data
y = torch.tensor(y)
print(y.size())
# no shuffling
x_train, y_train = x, y
x_val, y_val = x[3:17], y[3:17]
x_test, y_test = x[7:28], y[7:28]
# shuffle and split data
# idx = torch.randperm(n_train + n_val + n_test)
# x_train, y_train = x[idx[:n_train]], y[idx[:n_train]]
# x_val, y_val = x[idx[n_train:n_train + n_val]], y[idx[n_train:n_train + n_val]]
# x_test, y_test = x[idx[n_train + n_val:]], y[idx[n_train + n_val:]]

torch.Size([4000, 2])


Choose network and architecture

In [None]:
width = 16
depth = 4
f = HINNVerlet(1,width,1) #one output for HINN 2 for the others
#print(f)
# Pytorch optimizer for the network weights
optimizer = torch.optim.Adam(f.parameters(), lr=1e-3) #weight decay is for regularization weight_decay=1e-5 add this for regularization


Define Mean Square Error loss

In [None]:
print(f(x_train))

tensor([[ 2.2282, -1.2061],
        [ 2.2274, -1.2061],
        [ 2.2266, -1.2060],
        ...,
        [12.3514, -4.4770],
        [12.3544, -4.4781],
        [12.3575, -4.4791]], grad_fn=<CatBackward0>)


In [None]:
def mse_loss(y_true: torch.Tensor,y: torch.Tensor):
  return (0.5 / y.shape[0]) * torch.norm(y_true - y.view_as(y_true)) ** 2

Training

In [None]:
# training parameters
max_epochs = 100
batch_size = 5

# get printouts
headers, printouts_str, printouts_frmt = print_headers()

# ---------------------------------------------------------------------------- #
# initial evaluation
# mse_loss = nn.MSELoss()


loss_train = mse_loss(f(x_train),y_train)
loss_val = mse_loss(f(x_val),y_val)

n_loss = 1
his_iter = (-1, 0.0) + ('|',) + (0,) + ('|',) + (loss_train.item(),) + ('|',) + (loss_val.item(),)
print(his_iter)
print(printouts_frmt.format(*his_iter))

# store history
his = np.array([x for x in his_iter if not (x == '|')]).reshape(1, -1)
# ---------------------------------------------------------------------------- #
# main iteration

log_interval = 5 # how often printouts appear
for epoch in range(max_epochs):
    t0 = time.perf_counter()
    # training here
    f.train()
    n = x_train.shape[0]
    b = batch_size
    n_batch = n // b
    loss = torch.zeros(1)
    running_loss = 0.0

    # shuffle
    idx = torch.randperm(n)

    for i in range(n_batch):
        idxb = idx[i * b:(i + 1) * b]
        xb, yb = x_train[idxb], y_train[idxb]

        optimizer.zero_grad()
        fb= f(xb)

        loss = mse_loss(fb,yb)
        running_loss += b * loss.item()

        # update network weights
        loss.backward(retain_graph= True)
        optimizer.step()

    running_loss = (running_loss / n,)
    
    t1 = time.perf_counter()

    # test
    loss_train = mse_loss(f,y_train)
    loss_val = mse_loss(f(x_val),y_val)
    t = t1-t0
    his_iter = (epoch, t1 - t0) + ('|',) + running_loss + ('|',) + (loss_train.item(),) + ('|',) + (loss_val.item(),)
    if epoch % log_interval == 0:
      print(printouts_frmt.format(*his_iter))

    # store history
    idx = [idx for idx, n in enumerate(np.array([x for x in printouts_str if not (x == '|')])) if n == 'loss'][1]
    his = np.concatenate((his, np.array([x for x in his_iter if not (x == '|')]).reshape(1, -1)), axis=0)
# ---------------------------------------------------------------------------- #
# overall performance on test data
loss_test = mse_loss(f(x_test), y_test)
print('Test Loss: %0.4e' % loss.item())

# convergence plots
fig = plt.figure()
linewidth = 3
idx = [idx for idx, n in enumerate(np.array([x for x in printouts_str if not (x == '|')])) if n == 'loss'][1]

plt.semilogy(his[:, 0], his[:, idx], linewidth=linewidth, label='f')

plt.xlabel('epoch')
plt.ylabel('loss')
plt.title('HINN Verlet Training Loss') #change based on what method you are using
plt.legend()
plt.show()
#fig.savefig('conv_plot.png',dpi=300) #to save the image

                              | running        | train          | valid          
epoch          time           | loss           | loss           | loss           
(-1, 0.0, '|', 0, '|', 31.279190690216904, '|', 7.443099038470864)
-1             0.0000         | 0.0000e+00     | 3.1279e+01     | 7.4431e+00     
0              1.2827         | 2.5391e+00     | 2.0578e+00     | 2.8828e+00     
5              1.0215         | 1.7245e+00     | 1.6378e+00     | 2.7674e+00     
10             1.0384         | 8.4613e-01     | 7.8201e-01     | 4.7804e-02     
15             1.0168         | 6.1495e-01     | 5.6720e-01     | 3.4518e-01     
20             1.0095         | 4.9131e-01     | 4.4321e-01     | 6.9312e-02     
25             1.0163         | 1.9023e-01     | 1.4853e-01     | 4.8180e-02     
30             1.0200         | 3.9399e-02     | 4.2290e-02     | 1.7485e-02     
35             1.0437         | 2.0082e-02     | 4.3530e-02     | 3.1354e-02     
40             1.0185         |

KeyboardInterrupt: ignored

Testing the network on unseen timesteps

In [None]:
fig = plt.figure()
t_test = torch.tensor(np.linspace(0,15,2000)).reshape(-1,1)
plt.plot(t_test.detach().numpy(),f(t_test)[:,0].detach().numpy(),x_train.detach().numpy(),f(x_train)[:,1].detach().numpy())
plt.title("RK4 SHO")
plt.xlabel("Time")
plt.legend('y','z')
plt.show()
#fig.savefig('conv_plot2.png',dpi=300) #to save the image

In [None]:
"""
#update the inputs to the plt.plot function, but idk how :(
plt.plot(sho.y[0], sho.y[1],'b.')
plt.xlabel(r"$y(t)$", fontsize = 15)
plt.ylabel(r"$\frac{dy}{dt}$", fontsize = 15)
plt.title('Hamiltonian', fontsize = 15)
"""