In [None]:
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from matplotlib import cm

from utils_2D import gaussian_process_2d, normalize, solve_allen_cahn, save_object, load_object, Net_FNN, Net_CNN, ONet

In [None]:
# Generate training data (x, u)
# 2D random distribution of u 
class Data():
    '''du/dt=-ku(x,t), -1<=x<=1
        input u(x, t0)
       output u(x, t1)
    '''
    def __init__(self, x, n_grid, length_scale, train_num, test_num):
        
        self.x = x
        self.n_grid = n_grid
        self.length_scale = length_scale
        self.train_num = train_num
        self.test_num = test_num
        self.__init_data()
        
    def __init_data(self):
        
        self.X, self.u_train = self.u_data(self.train_num)
        _, self.u_test = self.u_data(self.test_num)
    

    def u_data(self, n_samples=1):

        # us - random distribution of u(x)
        # X - corresponding locations (space coordinates) of us
        X, us =  gaussian_process_2d(self.x, self.n_grid, n_samples, self.length_scale, u_mean=0.)
        
        # Normalize distribution within [-1, 1]
        us = normalize(us)
        us = us.reshape(-1, self.n_grid, self.n_grid)
        us = np.expand_dims(us, axis=-1)
        
        return tf.constant(X, dtype=tf.float32), tf.constant(us, dtype=tf.float32)
    
    def x_data(self, Nx):

        Xf = np.random.rand(Nx, 2)*2. - 1.0
        Xf = tf.constant(Xf, dtype=tf.float32)

        return Xf

In [None]:
tao = 0.005 # time step
eps = 0.25  # physical constant: length scale

# 2D Solution domain
xd = -1, 1, -1, 1
a0, a1, b0, b1 = xd

# homogenious free energy density
Fe = lambda u: (u**2 - 1)**2/4 

In [None]:
n_grid = 28
length_scale_list = [0.2, 0.5]
train_num = 400
test_num = 100
data_sensor = Data(xd, n_grid, length_scale_list, train_num, test_num)

In [None]:
plt.imshow(data_sensor.u_train[1, :, :, 0], vmin=-1, vmax=1, cmap=cm.gray_r)
plt.colorbar()

In [None]:
Xf = data_sensor.X
u_train = data_sensor.u_train
u_test = data_sensor.u_test

# Get the solution for the next time step tao as the input for training
# Boundary condition will be satisfied
u_train = solve_allen_cahn(u_train, xd, n_grid, tao, eps)
u_test = solve_allen_cahn(u_test, xd, n_grid, tao, eps)

# Get the solution for the next time step tao as the output of the ground truth
u_out_train = solve_allen_cahn(u_train, xd, n_grid, tao, eps)
u_out_test = solve_allen_cahn(u_test, xd, n_grid, tao, eps)

In [None]:
# Random u is generated in a grid of 28X28
# We sample 14X14 points from the grid to decrease the number of input features for branch net
n_grid_sample = 14
n_interval = int(n_grid / n_grid_sample)

u_train_p = u_train
u_train = u_train[:,::n_interval,::n_interval,:]

In [None]:
# save all data as one pickle file
# save_object([Xf, u_train, u_out_train, u_test, u_out_test], 'Data/2D_AH_N2.pkl')
Xf, u_train, u_out_train, u_test, u_out_test = load_object('Data/2D_AH_N2.pkl')

In [None]:
n_outs = 100

branch_c_1 = {
    'filters': 32,
    'kernels': (3, 3),
    'strides': (1, 1),
    'padding': 'valid',
    'activation': None
}
branch_c_2 = {
    'filters': 4,
    'kernels': (3, 3),
    'strides': (3, 3),
    'padding': 'valid',
    'activation': None
}
branch_c_3 = {
    'filters': 5,
    'kernels': (7, 7),
    'strides': (7, 7),
    'padding': 'valid',
    'activation': None
}

branch_f = [{
    'nodes': [n_outs],
    'activation': 'relu'
}]
 
trunk_f = [{
    'nodes': [100, 100],
    'activation': 'relu'
},
{
    'nodes': [n_outs],
    'activation': 'linear'
}]

net_branch = Net_CNN([branch_c_1, branch_c_2], Net_FNN(branch_f))
net_trunk  = Net_FNN(trunk_f)
onet = ONet(net_trunk, net_branch)


In [None]:
onet(u_train[0:1,:,:,:], Xf, True)

In [None]:
onet.summary()

In [None]:
net_branch.summary()


In [None]:
train_dataset = tf.data.Dataset.from_tensor_slices((u_train, u_train_p))
train_dataset = train_dataset.shuffle(buffer_size=train_num).batch(256)

In [None]:
def physics_informed_train_step(onet, up, Xf, up0):
    
    with tf.GradientTape() as g:

        u, du_x, du_y = onet(up, Xf, True)

        #du_x, du_y = du_x/a1, du_y/b1

        ut = tf.reshape(tf.squeeze(up0), [up0.shape[0], -1]) # 4D -> 2D
        #u0 = onet(up, data_sensor.X)

        L_enegy    = tf.reduce_mean(0.5*(du_x**2 + du_y**2) + 1/eps**2*Fe(u)) * (a1-a0)*(b1-b0)/4
        L_distance = tf.reduce_mean((ut - u)**2)/tao/2 * (a1-a0)*(b1-b0)/4

        #print(ut.shape, u0.shape, du_x.shape, u.shape)

        loss = L_enegy + L_distance
        
    grads = g.gradient(loss, onet.trainable_variables)
    
    optimizer.apply_gradients(zip(grads, onet.trainable_variables))
        
    return L_enegy, L_distance, loss

In [None]:
from sklearn.metrics import r2_score, mean_squared_error
# function to calulate the r2 score and mse
# samples are 3d matrix, with the first dimension being the sample number
def accuracy(u_pred, u_true):
    
    u_pred = tf.reshape(u_pred, (-1, n_grid*n_grid))
    u_true = tf.reshape(u_true, (-1, n_grid*n_grid))
    
    r2 = r2_score(u_true, u_pred)
    mse = mean_squared_error(u_true, u_pred)

    # L2 relative error
    l2_rel = tf.norm(u_true-u_pred)/tf.norm(u_true)
    
    return r2, l2_rel


In [None]:
nepochs = 100
optimizer = tf.optimizers.Adam(learning_rate=0.001)

for epoch in range(nepochs): 

    for up, up0 in train_dataset:

        loss_energy, loss_distance, loss = physics_informed_train_step(onet, up, Xf, up0)

    if (epoch+1) % 100 == 0:
        u_test_pred = onet(u_train, Xf)
        r2_test, mse_test = accuracy(u_test_pred, u_out_train)

        print(f'Epoch Inner:{epoch+1}, Loss:{loss:.4e}, Loss_energy:{loss_energy:.4e}, Loss_distance:{loss_distance:.4e}')
        print(f' R2:{r2_test:.4e}, MSE:{mse_test:.4e}')

In [None]:
# dataset for supervised learning
train_dataset_sup = tf.data.Dataset.from_tensor_slices((u_train, u_out_train))
train_dataset_sup = train_dataset_sup.shuffle(buffer_size=train_num).batch(256)

In [None]:
def suppervised_train_step(onet, up, Xf, uq):
    
    with tf.GradientTape() as g:

        #du_x, du_y = du_x/a1, du_y/b1

        uq_true = tf.reshape(tf.squeeze(uq), [uq.shape[0], -1]) # 4D -> 2D
        uq_pred = onet(up, Xf)

        loss = tf.reduce_mean((uq_true - uq_pred)**2)
        
    grads = g.gradient(loss, onet.trainable_variables)
    
    optimizer.apply_gradients(zip(grads, onet.trainable_variables))
        
    return loss

In [None]:
%%time
nepochs = 1000
optimizer = tf.optimizers.Adam(learning_rate=0.001)

for epoch in range(nepochs):

    for up, uq in train_dataset_sup:

        loss = suppervised_train_step(onet, up, Xf, uq)
        
    if (epoch+1) % 100 == 0:
        print(f'Epoch Outer:{epoch+1}, Loss:{loss:.4e}')

        u_test_pred = onet(u_train, Xf)
        r2_test, mse_test = accuracy(u_test_pred, u_out_train)
        print(f' R2:{r2_test:.4e}, MSE:{mse_test:.4e}')


In [None]:
np.savetxt('Data/training_err_2D.txt', train_err, fmt='%.4e')
np.savetxt('Data/testing_err_2D.txt', test_err, fmt='%.4e')
np.savetxt('Data/r2_train_2D.txt', r2_train, fmt='%.4e')
np.savetxt('Data/r2_test_2D.txt', r2_test, fmt='%.4e')
np.savetxt('Data/run_time_2D.txt', run_time, fmt='%.4e')

In [None]:
# save model weights
#onet.save_weights('Data/onet_weights_2D_AC_Map')
# load model weights
# onet.load_weights('Data/onet_weights_2D_AC_Map')

In [None]:
i = 19 #12, 18
u0 = u_train[i:i+1, :, :, 0:1]

fig, ax = plt.subplots(1,6, figsize=(26,4))
ax[0].imshow(u_train_p[i, :, :, 0], vmin=-0.3, vmax=1.0, cmap=cm.gray_r)
ax[0].axis('off')
for i in range(5):
    for _ in range(5):
        u0_p = onet(u0, Xf)
        u0_p = tf.reshape(u0_p, [1, n_grid, n_grid, 1])
        u0 = u0_p[:, ::2, ::2, :]

    im = ax[i+1].imshow(u0_p[0, :, :, 0], vmin=-1, vmax=1.0, cmap=cm.gray_r) #cmap=cm.gray_r
    ax[i+1].axis('off')
# add colorbar
fig.subplots_adjust(right=0.95)
cbar_ax = fig.add_axes([0.96, 0.12, 0.01, 0.75])
fig.colorbar(im, cax=cbar_ax)
fig.show()

In [None]:
# compare u0_p and utn side by side at different time and export the results to animation
i = 19
u0 = u_train_p[i:i+1, :, :, 0:1]

fig, ax = plt.subplots(1,6, figsize=(26,4))
ax[0].imshow(u_train_p[i, :, :, 0], vmin=-0.3, vmax=1.0, cmap=cm.gray_r)
ax[0].axis('off')
for i in range(5):
    for _ in range(5):
        u0 = solve_allen_cahn(u0, xd, n_grid, tao, eps)
        utn = tf.reshape(u0 , [n_grid, n_grid])
    im = ax[i+1].imshow(utn, vmin=-1.0, vmax=1.0, cmap=cm.gray_r)
    ax[i+1].axis('off')
# add colorbar
fig.subplots_adjust(right=0.95)
cbar_ax = fig.add_axes([0.96, 0.12, 0.01, 0.75])
fig.colorbar(im, cax=cbar_ax)
fig.show()

In [None]:
import matplotlib.animation as animation
from functools import partial

# def updatefig(*args):
def updatefig(frame_number, N):

    global ut0, u0, im1, im2

    for _ in range(N):

        ut0 = solve_allen_cahn(ut0, x, n_grid, tao, eps)
        utn = tf.reshape(ut0 , [n_grid, n_grid])

        u0_p = onet(u0, Xf)
        u0_p = tf.reshape(u0_p, [1, n_grid, n_grid, 1])
        u0 = u0_p[:, ::2, ::2, :]

    im1.set_array(utn)
    im2.set_array(u0_p[0, :, :, 0])

    t = frame_number*tao*N
    
    # update text
    fig.texts[1].set_text(f'Time: {t:.2f}')

    return im1, im2

In [None]:
i = 12 # 9 # 18
ut0 = u_train_p[i:i+1, :, :, 0:1]
u0 = u_train[i:i+1, :, :, 0:1]

fig, ax = plt.subplots(1, 2, figsize=(8,4))


im1 = ax[0].imshow(ut0[0,:,:,0], vmin=-0.3, vmax=1.0, cmap=cm.gray_r)
im2 = ax[1].imshow(ut0[0, :, :, 0], vmin=-0.3, vmax=1.0, cmap=cm.gray_r) #cmap=cm.gray_r

ax[0].axis('off')
ax[1].axis('off')

ax[0].set_title(f'Ground Truth')
ax[1].set_title(f'Prediction')

# title of the figure
fig.suptitle(f'Allen-Cahn Equation', fontsize=16)
# text beside the sup title
fig.text(0.5, 0.08, f'Time: 0', ha='center', fontsize=12)

fig.subplots_adjust(right=0.9)
cbar_ax = fig.add_axes([0.92, 0.15, 0.02, 0.7])
fig.colorbar(im2, cax=cbar_ax)

# export plot as animation
anim = animation.FuncAnimation(fig,  partial(updatefig, N=2), frames=12, interval=500, blit=True)
#anim.save('allen_cahn-3.gif', writer='imagemagick')
