In [None]:
use_cuda = True


## Basic Wave Equation Solver

In [None]:
import numpy as np
import torch
from scipy.signal import butter, lfilter, freqz

def butter_lowpass(cutoff, fs, order=5):
    nyq = 0.5 * fs
    normal_cutoff = cutoff / nyq
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    return b, a

def butter_lowpass_filter(data, cutoff, fs, order=5):
    b, a = butter_lowpass(cutoff, fs, order=order)
    y = lfilter(b, a, data)
    return y

def find_nearest(array, value):
    array = np.asarray(array)
    idx = (np.abs(array - value)).argmin()
    return idx


class WaveSolver():
    def __init__(self):
        super(WaveSolver, self).__init__()

        # Basic Parameter Set
        self.f = 4410      # sampling frequency
        self.T = 1/self.f    # sampling time
        self.dur = 0.1       # simulation duration

        self.numT = round(self.dur / self.T)
        self.t = np.linspace(0, self.dur, num=self.numT, endpoint=True ) # time vector

        self.numXs = 256      # spatial grid points
        self.l = 2.5          # length of the pipe
        self.xs = np.linspace(0.0, self.l, num=self.numXs, endpoint=True) # space vector

        self.c0 = 30       # propagation speed

    def create_pluck(self, xe):
        #xeVec = np.array([0.1*self.l, 0.2*self.l, 0.3*self.l]) # vector of excitation positions (can be extended)
        # xe = 0.1*l;
        xi = find_nearest(self.xs, xe * self.l)
        initial_condition = np.zeros((self.numXs, 1))
        # initial_condition[xi] = 1;
        xi = np.minimum(self.numXs-12,xi)
        initial_condition[xi:12+xi,0] = np.hanning(12)
        return initial_condition

    def create_random_initial(self):
        order = 6
        fs = 4410
        cutoff = 300
        initial_condition = np.random.randn(self.numXs, 1) * 10
        initial_condition[-50:] = 0
        initial_condition = butter_lowpass_filter(initial_condition.transpose(), cutoff, fs, order).transpose()

        return initial_condition


    def solve(self, initial_condition):
        # FTM Stuff
        Mu = 250       # number of eigenvalues
        mu = np.arange(1, Mu+1) # 1:Mu;
        c0 = self.c0
        l = self.l
        numT = self.numT
        numXs = self.numXs

        test = 1j*c0*mu*np.pi/l

        gmu = np.concatenate((mu*np.pi/l, mu*np.pi/l))
        smu = np.concatenate((1j*c0*mu*np.pi/l, -1j*c0*mu*np.pi/l))

        K1 = lambda x: 1j*np.sin(gmu*x) # @(x) 1j*sin(gmu*x);
        K2 = lambda x: 1j*smu*np.sin(gmu*x)
        Ka1 = lambda x: 1j/c0**2*np.conj(smu)*np.sin(gmu*x)
        Ka2 = lambda x: 1j*np.sin(gmu*x)

        nmu = 1./(l/2*(c0**2*smu + np.conj(smu)))

        A = np.diag(np.exp(smu*self.T))



        # Excitation for the wave equation is a simple delta-impulse at position xe
        # Possible extensions:
        # - excitation by a hamming window to have a more smooth excitation
        # - combination with a temporal excitation shape

        # initial_condition = initial_condition / sum(initial_condition**2) * self.T
        yi = np.zeros(2*Mu,dtype=complex)
        for xi in range(numXs) : #1:length(xs)
            yi += Ka2(self.xs[xi]) * self.T * initial_condition[xi]

        #yi = Ka2(xe)*self.T # set initial values for states

        # vectors
        ybar = np.zeros((2*Mu, numT),dtype=complex)

        # set initial states
        ybar[:,0] = yi

        test = range(1,numT)

        # processing to create time progression of individual states
        for k in range(1,numT) :
            ybar[:,k] = A@ybar[:,k-1]

        # create output signal over time at a single observation position
        # (maybe this part is not necessary, therefore it is commented)
        # xo = 0.7*l
        # c1 = K1(xo)
        # y = c1@ybar # recover deflection from states (inverse transformation)
        # y = np.real(y)

        # create spatial vectors.
        # Result y_x: spatial distribution of the deflection y on the pipe at all
        # temporal sampling points
        K1_x = np.zeros((numXs, 2*Mu),dtype=complex)
        y_x = np.zeros((numXs, numT),dtype=complex)

        for xi in range(numXs) : #1:length(xs)
            K1_x[xi,:] = K1(self.xs[xi])/nmu
            y_x[xi,:] = K1_x[xi,:]@ybar

        # take the real part because there might be a small imaginary part
        y_x = np.real(y_x)
        # y_x = y_x / 10**6 # scale the output to reasonable values around 1
        y_x = y_x / y_x.std();

        return y_x


## Generate Dataset

In [None]:
waveSolver = WaveSolver()

xeVec = np.array([0.7, 0.3, 0.1]) # relative position
xeVec = np.random.rand(10)

num_param_steps = (waveSolver.numT-2) * xeVec.size
grid_size = waveSolver.numXs
training_input = torch.zeros(num_param_steps, grid_size,2)
training_output = torch.zeros(num_param_steps, grid_size,1)

index = 0

for xe in xeVec :
    # initial_condition = waveSolver.create_pluck(xe)
    initial_condition = waveSolver.create_random_initial()
    y_x = waveSolver.solve(initial_condition)

    for k in range(2,waveSolver.numT) :
        training_input[index,:,0] = torch.tensor(y_x[:,k-1])
        training_input[index,:,1] = torch.linspace(0,1, grid_size)
        training_output[index,:,0] = torch.tensor(y_x[:,k])
        index += 1


In [None]:
import matplotlib.pyplot as plt
fig, axs =  plt.subplots(figsize=(12, 6))
axs.plot(waveSolver.xs, y_x[:,[1, 20, 30, 140]] + np.linspace(0,1,4)*10)
axs.plot(waveSolver.xs, training_input[2,:,0] - 10)

axs.set_xlabel('Space')
axs.set_ylabel('Amplitude')
axs.grid(True)

# from matplotlib.animation import FuncAnimation
# fig = plt.figure(figsize=(15,15))
# ax = plt.axes(xlim=(0, 2.5), ylim=(-.2, .2))
# line, = ax.plot([], [], lw=2)
#
# x,y = [], []
# def animate(i):
#     line.set_data(waveSolver.xs, y_x[:,i])
#
# ani = FuncAnimation(fig, animate, interval=30)
# ani.save('WaveAnimation.mp4')

Model definitions copied from https://github.com/zongyi-li/fourier_neural_operator

In [None]:
import torch.nn as nn
import torch.nn.functional as F

################################################################
#  1d fourier layer
################################################################
class SpectralConv1d(nn.Module):
    def __init__(self, in_channels, out_channels, modes1):
        super(SpectralConv1d, self).__init__()

        """
        1D Fourier layer. It does FFT, linear transform, and Inverse FFT.    
        """

        self.in_channels = in_channels
        self.out_channels = out_channels
        self.modes1 = modes1  #Number of Fourier modes to multiply, at most floor(N/2) + 1

        self.scale = (1 / (in_channels*out_channels))
        self.weights1 = nn.Parameter(self.scale * torch.rand(in_channels, out_channels, self.modes1, dtype=torch.cfloat))

    # Complex multiplication
    def compl_mul1d(self, input, weights):
        # (batch, in_channel, x ), (in_channel, out_channel, x) -> (batch, out_channel, x)
        return torch.einsum("bix,iox->box", input, weights)

    def forward(self, x):
        batchsize = x.shape[0]
        #Compute Fourier coeffcients up to factor of e^(- something constant)
        x_ft = torch.fft.rfft(x)

        # Multiply relevant Fourier modes
        out_ft = torch.zeros(batchsize, self.out_channels, x.size(-1)//2 + 1,  device=x.device, dtype=torch.cfloat)
        out_ft[:, :, :self.modes1] = self.compl_mul1d(x_ft[:, :, :self.modes1], self.weights1)

        #Return to physical space
        x = torch.fft.irfft(out_ft, n=x.size(-1))
        return x

class FNO1d(nn.Module):
    def __init__(self, modes, width):
        super(FNO1d, self).__init__()

        """
        The overall network. It contains 4 layers of the Fourier layer.
        1. Lift the input to the desire channel dimension by self.fc0 .
        2. 4 layers of the integral operators u' = (W + K)(u).
            W defined by self.w; K defined by self.conv .
        3. Project from the channel space to the output space by self.fc1 and self.fc2 .
        
        input: the solution of the initial condition and location (a(x), x)
        input shape: (batchsize, x=s, c=2)
        output: the solution of a later timestep
        output shape: (batchsize, x=s, c=1)
        """

        self.modes1 = modes
        self.width = width
        self.padding = 2 # pad the domain if input is non-periodic
        self.fc0 = nn.Linear(2, self.width) # input channel is 2: (a(x), x)

        self.conv0 = SpectralConv1d(self.width, self.width, self.modes1)
        self.conv1 = SpectralConv1d(self.width, self.width, self.modes1)
        self.conv2 = SpectralConv1d(self.width, self.width, self.modes1)
        self.conv3 = SpectralConv1d(self.width, self.width, self.modes1)
        self.w0 = nn.Conv1d(self.width, self.width, 1)
        self.w1 = nn.Conv1d(self.width, self.width, 1)
        self.w2 = nn.Conv1d(self.width, self.width, 1)
        self.w3 = nn.Conv1d(self.width, self.width, 1)

        self.fc1 = nn.Linear(self.width, 128)
        self.fc2 = nn.Linear(128, 1)

    def forward(self, x):
        x = self.fc0(x)
        x = x.permute(0, 2, 1)
        # x = F.pad(x, [0,self.padding]) # pad the domain if input is non-periodic

        x1 = self.conv0(x)
        x2 = self.w0(x)
        x = F.gelu(x1) + x2

        x1 = self.conv1(x)
        x2 = self.w1(x)
        x = F.gelu(x1) + x2

        x1 = self.conv2(x)
        x2 = self.w2(x)
        x = F.gelu(x1) + x2

        x1 = self.conv3(x)
        x2 = self.w3(x)
        x = F.gelu(x1) + x2

        # x = x[..., :-self.padding] # pad the domain if input is non-periodic
        x = x.permute(0, 2, 1)
        x = self.fc1(x)
        x = F.gelu(x)
        x = self.fc2(x)
        return x


In [None]:
modes = 64 # 32
width = 32 # 16

epochs = 1000
learning_rate = 1e-4
batch_size = 64

if use_cuda :
    model = FNO1d(modes, width).to('cuda')
else :
    model = FNO1d(modes, width)


dataloader = torch.utils.data.DataLoader(torch.utils.data.TensorDataset(training_input, training_output), batch_size=batch_size, shuffle=True)
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr = 1e-3, epochs=epochs, steps_per_epoch=len(dataloader))



for ep in range(epochs):
    for input, output in dataloader:
        if use_cuda :
            input, output = input.cuda(), output.cuda()
        optimizer.zero_grad()
        predicted_output = model(input)
        loss = torch.nn.functional.mse_loss(predicted_output, output)
        loss.backward()
        optimizer.step()
        scheduler.step()
    print("\r",'epochs:' + str(ep) + ', loss:' + str(loss.detach().cpu().numpy()), end = "")



Check output

In [None]:
import matplotlib.pyplot as plt

grid_start = 0
grid_end = 1
test_grid_size = 64
field_val = 2

model_input = torch.zeros(1,grid_size,2)
model_output = torch.zeros(1,grid_size,1)

testNum = 40
timeSteps = 3
model_input[:,:,:] = training_input[testNum,:,:]
model_output[:,:,:] = training_output[testNum,:,:]
model_output[:,:,:] = training_output[testNum+timeSteps-1,:,:]

initial_condition = waveSolver.create_pluck(0.3)
# initial_condition = waveSolver.create_random_initial()
y_x = waveSolver.solve(initial_condition)
model_input[0,:,0] = torch.tensor(y_x[:,testNum-1])
model_output[0,:,0] = torch.tensor(y_x[:,testNum-0])
model_output[0,:,0] = torch.tensor(y_x[:,testNum-0+timeSteps-1])

input_field =  model_input[0,:,0]

if use_cuda :
    model_input = model_input.to('cuda')

for _ in range(timeSteps): # repeatedly predict next step
    model_result = model(model_input)
    model_input[:,:,0] = model_result[:,:,0]

if use_cuda :
    model_output = model_output.cuda()

loss = torch.nn.functional.mse_loss(model_result, model_output)
print("\r",'loss:' + str(loss.detach().cpu().numpy()), end = "")

plt.figure()
plt.plot(input_field.data, label='input')
plt.plot(model_output[0,:,0].cpu().data + 10, label='next time step')
plt.plot(model_result.detach().cpu().flatten().numpy() + 20, label='predicted time step')
plt.legend(bbox_to_anchor=(0., 1.02, 1., .102), loc='lower left',
           ncol=2, mode="expand", borderaxespad=0.) 