**Import Required Libraries**

In [8]:
import torch     
from torch import Tensor
import torch.nn as nn                 
import torch.optim as optim              
import matplotlib.pyplot as plt
import numpy as np
import itertools
from collections import OrderedDict
import plotly.graph_objects as go # for plotting the graph

Load the training data from numpy

In [9]:
d = np.load(r"C:\Users\shirs\Desktop\APL745 Deep Learning\Lab 12\antiderivative_aligned_train.npz", allow_pickle=True)
y_train=d["X"][1].astype(np.float32) #output locations (100,1)
u_train = d["X"][0].astype(np.float32) # input functions (150,100)
s_train = d["y"].astype(np.float32) # output functions (150,100)

u_train = torch.tensor(u_train, requires_grad=True, dtype=torch.float32)
y_train = torch.tensor(y_train, requires_grad=True, dtype=torch.float32)
s_train = torch.tensor(s_train, requires_grad=True, dtype=torch.float32)

**Define Network Archietecture**

In [10]:
branch_truck_output_size = 50 # number of points in output of branch and trunk network

class branch_neural_network(nn.Module):
    def __init__(self):
        super(branch_neural_network, self).__init__()
        
        self.layer1 = nn.Linear(u_train.shape[1], 50)
        self.layer2 = nn.Linear(50, 50)
        self.layer3 = nn.Linear(50, 50)
        self.layer4 = nn.Linear(50, 50)
        self.layer_out = nn.Linear(50, branch_truck_output_size)
        self.tanh = nn.Tanh()
        # self.leakyrelu = nn.LeakyReLU()
        self.relu = nn.LeakyReLU()

    def forward(self, u):
        b = self.relu(self.layer1(u))
        b = self.relu(self.layer2(b))
        b = self.relu(self.layer3(b))
        b = self.relu(self.layer4(b))
        b = self.layer_out(b)
        return b
    
class trunk_neural_network(nn.Module):
    def __init__(self):
        super(trunk_neural_network, self).__init__()
        
        self.layer1 = nn.Linear(y_train.shape[1], 50)
        self.layer2 = nn.Linear(50, 50)
        self.layer3 = nn.Linear(50, 50)
        self.layer4 = nn.Linear(50, 50)
        self.layer_out = nn.Linear(50, branch_truck_output_size)
        self.tanh = nn.Tanh()
        # self.leakyrelu = nn.LeakyReLU()
        self.relu = nn.ReLU()

    def forward(self, y):
        t = self.relu(self.layer1(y))
        t = self.relu(self.layer2(t))
        t = self.relu(self.layer3(t))
        t = self.relu(self.layer4(t))
        t = self.layer_out(t)
        return t

**The DeepONet Archietecture**

In [11]:
class PI_DeepONet():
    def __init__(self, u_train, y_train, s_train):
        
        self.u_train = u_train
        self.y_train = y_train
        self.s_train = s_train
        
    def operator_net(self, branch_net, trunk_net,u, y):    # Define DeepONet architecture
        B = branch_net(u) #output from branch Network
        T = trunk_net(y)  #output from branch Network 
        output = torch.matmul(B, torch.transpose(T,0,1))
        return output
        
    # Define operator loss
    def loss_operator(self, output, s_train):
        # Compute forward pass     
        loss = torch.mean( torch.square( output - self.s_train) )         # Compute loss
        return loss
        
    def loss_func(self, output, s_train ):   #Define loss function for optimization step
        loss = self.loss_operator(output, s_train)
        self.optimizer.zero_grad()
        loss.backward()
        return loss
    
    def train(self, epochs):
        self.epochs = epochs
        branch_net = branch_neural_network()  # The branch Network
        trunk_net = trunk_neural_network()   # The trunk Network
        branch_params = list(branch_net.parameters()) #extract the network Parameters in list format
        trunk_params = list(trunk_net.parameters())  #extract the network Parameters
        params = branch_params+trunk_params
        optimizer = torch.optim.Adam(params) #define the optimizer
        branch_net.train()  #set the network to training mode
        trunk_net.train()  #set the network to training mode

        model_loss=[]
        epochs_list=[]
        for epoch in range(epochs):
            optimizer.zero_grad()
            output = self.operator_net(branch_net,trunk_net,self.u_train,self.y_train)
            loss= self.loss_operator(output, self.s_train)
            loss.backward()
            optimizer.step()

            if epoch % 500 == 0:
               epochs_list.append(epoch)
               # print loss and iteration
               model_loss.append(loss.item())   #get the loss value for each iteration
               print(f'Epoch {epoch+0:03}: | Total Loss: {loss.item():.10f}') # | Diff Loss: {diff_loss.item():.10f} | BC Loss: {bc_loss.item():.10f}')
         
        fig = go.Figure()
        fig.add_trace(go.Scatter(x=epochs_list, y=model_loss, mode='lines+markers', name='Total Loss'))
        #fig.add_trace(go.Scatter(x=epochs_list, y=BC_loss_list, mode='lines+markers', name='BC Loss'))
        #fig.add_trace(go.Scatter(x=epochs_list, y=diff_loss_list, mode='lines+markers', name='Diff Loss'))
        fig.update_layout(title='Loss vs Epochs', xaxis_title='Epochs', yaxis_title='Loss')
        fig.show()

        # # Backward and optimize
        # self.optimizer.step(self.loss_func)
        return branch_net, trunk_net 
             
    # Evaluates predictions at test points  
    def predict_s(self, branch_net, trunk_net, u_test,y_test):
        s = self.operator_net( branch_net, trunk_net, u_test, y_test ) #predict
        return s

**Train the Model**

In [12]:
model = PI_DeepONet(u_train, y_train, s_train)
branch_net_trained, trunk_net_trained=model.train(epochs=12500)

Epoch 000: | Total Loss: 0.2685185075
Epoch 500: | Total Loss: 0.0002108254
Epoch 1000: | Total Loss: 0.0000285151
Epoch 1500: | Total Loss: 0.0000197677
Epoch 2000: | Total Loss: 0.0000572557
Epoch 2500: | Total Loss: 0.0000253032
Epoch 3000: | Total Loss: 0.0000155667
Epoch 3500: | Total Loss: 0.0000165734
Epoch 4000: | Total Loss: 0.0000206152
Epoch 4500: | Total Loss: 0.0000206019
Epoch 5000: | Total Loss: 0.0000077130
Epoch 5500: | Total Loss: 0.0000038857
Epoch 6000: | Total Loss: 0.0000047317
Epoch 6500: | Total Loss: 0.0000065279
Epoch 7000: | Total Loss: 0.0000653239
Epoch 7500: | Total Loss: 0.0000364495
Epoch 8000: | Total Loss: 0.0000015938
Epoch 8500: | Total Loss: 0.0000027525
Epoch 9000: | Total Loss: 0.0000012964
Epoch 9500: | Total Loss: 0.0000018361
Epoch 10000: | Total Loss: 0.0000453029
Epoch 10500: | Total Loss: 0.0000055217
Epoch 11000: | Total Loss: 0.0000012056
Epoch 11500: | Total Loss: 0.0000037019
Epoch 12000: | Total Loss: 0.0000010919


**Load Test Data**

In [13]:
d = np.load("antiderivative_aligned_test.npz", allow_pickle=True) 
y_test = d["X"][0].astype(np.float32)
u_test = d["X"][0].astype(np.float32); y_test=d["X"][1].astype(np.float32)
s_test = d["y"].astype(np.float32)
u_test = torch.tensor(u_test, requires_grad=True, dtype=torch.float32)
y_test = torch.tensor(y_test, requires_grad=True, dtype=torch.float32)
s_test = torch.tensor(s_test, requires_grad=True, dtype=torch.float32)

In [14]:
# Model Predictions
s_pred = model.predict_s(branch_net_trained, trunk_net_trained, u_test, y_test)
s_pred = s_pred.detach().numpy().reshape(-1)
y_test = y_test.detach().numpy().reshape(-1)
s_test = s_test.detach().numpy().reshape(-1)
# Plot Visualization
fig = go.Figure()
fig.add_trace(go.Scatter(x=y_test, y=s_test, mode='markers', name='True'))
fig.add_trace(go.Scatter(x=y_test, y=s_pred, mode='lines', name='Predicted'))
fig.update_layout(title='True vs Predicted', xaxis_title='y', yaxis_title='s',height=650)
fig.show()