In [None]:
import tensorflow as tf
import numpy as np
import scipy
import matplotlib.pylab as plt
import itertools
import random

import importlib
#MOR-Physics and (unstacked) DeepONets have been implemented in opreg.py
import opreg
importlib.reload(opreg);

# Burgers operator using DeepONet

Let's try to fit the Burgers operator on a periodic domain,

\begin{equation}
\partial_x u^2 = v
\end{equation}

given pairs of $u,v$

In [None]:
# generate some synthetic data
N = 256
samps = 10
L = 2.*np.pi
x = np.arange(0,N)/N*L
k = np.arange(0,N//2+1)
def burgers(samps):
    uh = 20*np.exp(2*np.pi*np.random.normal(0,1,(samps,N//2+1))*1.j)*scipy.special.erfc(k-3)
    u = np.fft.irfft(uh)
    v = np.fft.irfft(1.j*k*np.fft.rfft(u**2))
    return u,v
u,v = burgers(samps)
u_test,v_test = burgers(1)

In [None]:
#plots of a sample of u and v
fig,ax = plt.subplots(1,2,figsize=(8,4))
ax[0].plot(u[0])
ax[0].set_title('$u$ sample')
ax[1].plot(v[0])
ax[1].set_title('$v$ sample')
ax[1].plot()

In [None]:
#creates DeepONet operator. Plots DeepONet prediction before training

#trick for enforcing periodicity
y = np.stack([np.cos(x),np.sin(x)],axis=1)

with tf.device('/CPU:0'):
    op = opreg.DeepONetUnstacked(64,8,N,2) #makes DeepONet operator, args:(width,depth,dim u,dim y)
    plt.plot(v_test[0],label='true')
    plt.plot(op(u_test,y)[0],label='prediction')
    plt.legend()

In [None]:
with tf.device('/CPU:0'):
    # l2 loss
    def loss():
        return tf.reduce_sum((op(u,y) - v)**2)
        
    opt = tf.keras.optimizers.legacy.Adam(1e-3)
    @tf.function
    def train():
        with tf.GradientTape() as tape:
            tape.watch(op.trainable_variables)
            loss_ = loss()
        grad = tape.gradient(loss_,op.trainable_variables)
        opt.apply_gradients(zip(grad,op.trainable_variables))
        return loss_

In [None]:
for _ in range(1000):
    print(train().numpy())

In [None]:
#test function vs. DeepONet prediction

plt.plot(x,v[0],label='true')
plt.plot(x,op(u,y)[0],label='prediction')
plt.legend()

# Burgers equation using MOR-Physics

Let's try to recover the viscous Burgers equation. This time using MOR-Physics

\begin{aligned}
\partial_t u + \partial_x u^2 = \nu \partial_x^2 u \\
x \in [0,2\pi] \textrm{ (periodic) }
\end{aligned}

In [None]:
# generate some synthetic data. Plot a sample
N = 256
samps = 10
L = 2.*np.pi
x = np.arange(0,N)/N*L
k = np.arange(0,N//2+1)
T = 1.
Nt = 1000
dt = T/Nt
def IC(samps):
    uh = 20*np.exp(2*np.pi*np.random.normal(0,1,(samps,N//2+1))*1.j)*scipy.special.erfc(k-3)
    u = np.fft.irfft(uh)
    return u
def upd(u,dt):
    return u+dt*(-np.fft.irfft(1.j*k*np.fft.rfft(u**2) + 1e-1*k**2*np.fft.rfft(u)))
def burgersEqn(samps,dt,Nt):
    u = [IC(samps)]
    for t in range(Nt):
        u.append(upd(u[-1],dt))
    return np.stack(u,1)

u = burgersEqn(samps,dt,Nt)
u_test = burgersEqn(samps,dt,Nt)
plt.imshow(u_test[0],origin='lower',extent=[0,L,0,T])
plt.colorbar()
plt.xlabel('$x$')
plt.ylabel('$t$')
plt.figure()
plt.plot(x,u_test[0,::100].T)
plt.xlabel('x')
plt.ylabel('u')

print('data shape: ', u.shape)

In [None]:
importlib.reload(opreg)

with tf.device('/CPU:0'):
    #creates Forward Euler parameterized with a sum of 4 MOR-Physics operators
    op = opreg.FwdEuler(opreg.SumOPs([opreg.MORP(8,4,k) for _ in range(2)]),dt)
    #creates forward Euler update operator
    plt.imshow(op(u_test[0:1,0],1000).numpy()[0],origin='lower',extent=[0,L,0,T])
    plt.colorbar()
    plt.xlabel('$x$')
    plt.ylabel('$t$')

In [None]:
with tf.device('/CPU:0'):
    # l2 loss
    def loss(ud):
        Nt = ud.shape[1]
        u0 = ud[:,0]
        return tf.reduce_sum((op(u0,Nt-1) - ud[:,1:])**2)
    
    opt = tf.keras.optimizers.legacy.Adam(1e-3)
    @tf.function
    def train(ud):
        with tf.GradientTape() as tape:
            tape.watch(op.trainable_variables)
            loss_ = loss(ud)
        grad = tape.gradient(loss_,op.trainable_variables)
        opt.apply_gradients(zip(grad,op.trainable_variables))
        return loss_

    epochs=10
    batch_size = 10
    t_train = 5
    
    # Cartesian product of indicies from samples and indicies from time
    inds = np.array(list(itertools.product(np.arange(samps),np.arange(Nt-t_train))))
    # randomized
    np.random.shuffle(inds)
    
    for _ in range(epochs):
        for b in range(len(inds)//batch_size):
            bl = b*batch_size
            bh = (b+1)*batch_size
            #extract batch_size samples over t_train times
            u0 = u[inds[bl:bh][:,0],inds[bl:bh][:,1]]
            uNt = u[inds[bl:bh][:,0],inds[bl:bh][:,1]+t_train]
            ubatch = np.transpose(u[inds[bl:bh][:,0],inds[bl:bh][:,1][None]+np.arange(t_train)[:,None]],(1,0,2))
            print(train(ubatch).numpy())


In [None]:
# plots of predicted operator on test data
with tf.device('/CPU:0'):
    u_pred = op(u_test[0:1,0],1000).numpy()
    plt.imshow(u_pred[0],origin='lower',extent=[0,L,0,T])
    plt.colorbar()
    plt.xlabel('$x$')
    plt.ylabel('$t$')

In [None]:
fig,ax = plt.subplots(1,2,figsize=(8,4),sharex=True,sharey=True)
ax[0].plot(x,u_test[0,::100].T)
ax[1].plot(x,u_pred[0,::100].T)
ax[0].set_xlabel('x')
ax[1].set_xlabel('x')
ax[0].set_ylabel('u')

ax[0].set_title('test')
ax[1].set_title('predicted')