# Notebook \#2 - Implementation of Fourier Neural Operator

In [14]:
import torch
import torch.nn as nn
import torch.nn.init as init
from datetime import datetime
from data import MultiFunctionDatasetODE, custom_collate_ODE_fn
from torch.utils.data import DataLoader
import torch.optim as optim
import matplotlib.pyplot as plt
import numpy as np
import plotter
import torch.fft
import torch.nn.functional as F

In [None]:
class SpectralConv1d(nn.Module):
    def __init__(self, in_channels, out_channels, modes1):
        super(SpectralConv1d, self).__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.modes1 = modes1  # Number of Fourier modes to use
        self.scale = 1 / (in_channels * out_channels)
        # Initialize learnable complex weights for the Fourier modes
        self.weights1 = nn.Parameter(self.scale * torch.rand(in_channels, out_channels, self.modes1, dtype=torch.cfloat))

    def compl_mul1d(self, input, weights):
        # (batch, in_channel, x) and (in_channel, out_channel, x) -> (batch, out_channel, x)
        return torch.einsum("bix,iox->box", input, weights)

    def forward(self, x):
        batchsize = x.shape[0]
        # Compute Fourier coefficients via rFFT
        x_ft = torch.fft.rfft(x)
        # Create output tensor in Fourier domain and multiply relevant modes
        out_ft = torch.zeros(batchsize, self.out_channels, x.size(-1)//2 + 1, device=x.device, dtype=torch.cfloat)
        out_ft[:, :, :self.modes1] = self.compl_mul1d(x_ft[:, :, :self.modes1], self.weights1)
        # Return to physical domain via inverse rFFT
        x = torch.fft.irfft(out_ft, n=x.size(-1))
        return x

class FNO1d(nn.Module):
    def __init__(self, modes, width):
        super(FNO1d, self).__init__()
        self.modes1 = modes
        self.width = width
        
        # Lift the two-channel input (u and t) to a higher dimension
        self.fc0 = nn.Linear(2, self.width)  # Input channels are 2

        # Fourier layers with corresponding local convolution layers
        self.conv0 = SpectralConv1d(self.width, self.width, self.modes1)
        self.conv1 = SpectralConv1d(self.width, self.width, self.modes1)
        self.conv2 = SpectralConv1d(self.width, self.width, self.modes1)
        self.conv3 = SpectralConv1d(self.width, self.width, self.modes1)
        
        self.w0 = nn.Conv1d(self.width, self.width, 1)
        self.w1 = nn.Conv1d(self.width, self.width, 1)
        self.w2 = nn.Conv1d(self.width, self.width, 1)
        self.w3 = nn.Conv1d(self.width, self.width, 1)

        # Projection from the latent space back to the output function space
        self.fc1 = nn.Linear(self.width, 128)
        self.fc2 = nn.Linear(128, 1)

    def forward(self, x):
        # x has shape (batch, s, 2): channels for u and t
        x = self.fc0(x)  # Lifting the input to the latent dimension
        # Permute to (batch, channels, s) for convolution operations
        x = x.permute(0, 2, 1)

        # Four layers combining Fourier convolution with local convolution
        x = F.relu(self.conv0(x) + self.w0(x))
        x = F.relu(self.conv1(x) + self.w1(x))
        x = F.relu(self.conv2(x) + self.w2(x))
        x = self.conv3(x) + self.w3(x)

        # Permute back to (batch, s, channels) for the fully connected layers
        x = x.permute(0, 2, 1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x


In [None]:
def training(model, optimizer, dataloader, num_epochs= 1000, physics_loss_func=None, initial_loss_func=None, boundary_loss_func= None, plot=False):
    losses = []
    for epoch in range(num_epochs):

        model.train()
        
        for u, t, t0, ut in dataloader:

            u = u.unsqueeze(1) 
            t.requires_grad_(True)
            batch_size = u.shape[0]
            n_points = t.shape[0]

            physics_loss = physics_loss_func(model, u, t, t0, ut, batch_size, n_points) if physics_loss_func is not None else torch.zeros(1, device=t.device)
            initial_loss = initial_loss_func(model, u, t, t0, ut, batch_size, n_points) if initial_loss_func is not None else torch.zeros(1, device=t.device)
            boundary_loss = boundary_loss_func(model, u, t, t0, ut, batch_size, n_points) if boundary_loss_func is not None else torch.zeros(1, device=t.device)
            loss = physics_loss + initial_loss + boundary_loss

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        
        if (epoch + 1) % 10 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}, '
                    f'initial_loss: {initial_loss.item():.6f}, physics_loss: {physics_loss.item():.6f}, '
                    f'time: {datetime.now().time()}')
            
            if plot == True:
                plotter.GRF_test(model,m=m,lb=grf_lb,ub=grf_ub)
                plotter.linear_test(model,m=m)
                plotter.optimal_test(model,m=m)
                plotter.constant_test(model,m=m)
                plotter.polynomial_test(model,m=m)
                plotter.sine_test(model,m=m)
        
        if (epoch + 1) % 100 == 0:
            timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
            model_filename = f'model_time_[{timestamp}]_loss_[{loss.item():.4f}].pth'
            torch.save(model.state_dict(), f"trained_models/fnn/{model_filename}")
                        


        losses.append(loss)


    return model, losses

IndentationError: unexpected indent (804876560.py, line 11)

In [4]:
def physics_loss_func(model, u, t, t0, ut, batch_size, n_points, dim_x = 1):

    x = model(u, t)

    # Physics loss
    dx = torch.zeros(batch_size, n_points, dim_x, device=t.device)
    
    # This loop is a bottleneck but i havent found a way to parallize this efficiently
    for b in range(batch_size):
        # Compute gradients for each batch independently
        dx[b] = torch.autograd.grad(x[b], t, torch.ones_like(x[b]), create_graph=True)[0]

    dx_dt = dx[:,:,0]

    # physics loss
    physics = dx_dt + x - ut
    physics_loss = torch.mean(physics**2)

    return physics_loss

def innitial_loss_func(model, u, t, t0, ut, batch_size, n_points):

    x_0 = model(u, t0)
    initial_loss = torch.mean((torch.ones_like(x_0) - x_0)**2)
    
    return initial_loss

In [None]:
# Model Parameters
m = 200         # sensor size (branch input size)
n_hid = 250     # layer's hidden sizes
p = 200         # output size
dim_x = 1       # trunk (trunk input size)
lr = 0.0001
num_epochs = 1000

# Specify device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Initialize model
model = FNO1d(modes=12, width=20)

#Initialize Optimizer
optimizer = optim.Adam(model.parameters(), lr=lr)

cuda


In [6]:
#Dataset parameters
n_functions = 10000
grf_lb = 0.02
grf_ub = 0.5
end_time = 1.0
num_domain = 200
num_initial = 20

dataset = MultiFunctionDatasetODE(
    m=m,
    n_functions=n_functions,
    function_types=['grf', 'linear', 'sine', 'polynomial','constant'],
    end_time = end_time,
    num_domain = num_domain,
    num_initial = num_initial,
    grf_lb = grf_lb,
    grf_ub = grf_ub
)

dataloader = DataLoader(dataset, batch_size=100, collate_fn=custom_collate_ODE_fn, shuffle=True)

In [9]:
model = FNO1d(modes=16, width=64, in_channels=1, out_channels=1)
input_tensor = torch.randn(32, 1, 100)  # [batch, channel, m]
output = model(input_tensor)

In [13]:
trained_model, lossses = training(model, optimizer, dataloader, num_epochs = num_epochs, physics_loss_func=physics_loss_func, initial_loss_func=innitial_loss_func, plot=True)

torch.Size([100, 200])


TypeError: FNO1d.forward() takes 2 positional arguments but 3 were given