<a href="https://colab.research.google.com/github/jamesafful/classprojects/blob/main/deeponet_ex1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt

In [None]:
def build_network(layer_dims, activation='tanh'):
	kinitializer = 'glorot_uniform'
	input_dim = layer_dims[0]
	hidden_layer_dims = layer_dims[1:]
	n_hidden_layers = len(hidden_layer_dims)
	dense_layers = []
	dense_layers.append(keras.Input(shape=(input_dim)))
	for i in range(n_hidden_layers):
		dense_layers.append(
			layers.Dense(hidden_layer_dims[i], activation=activation, kernel_initializer=kinitializer, bias_initializer='zeros')
		)
	model = keras.Sequential(dense_layers)
	return model

In [None]:
class DeepONet(object):
	"""docstring for DeepONet"""
	def __init__(self,
		layer_sizes_branch,
		layer_sizes_trunk,
		activation_branch,
		activation_trunk
	):
		super(DeepONet, self).__init__()
		self.branch = build_network(layer_sizes_branch,activation_branch)
		self.trunk = build_network(layer_sizes_trunk, activation_trunk)
		self.b = tf.Variable(tf.zeros(1, dtype=tf.float32))

	def __call__(self, inputs):
		f = inputs[0]
		x = inputs[1]

		F = self.branch(f)
		X = self.trunk(x)
		u = tf.einsum("bi,ni->bn", F, X)

		u += self.b

		return u

In [None]:
class AntiDerivative(object):
	"""docstring for AntiDerivative"""
	def __init__(self, net, train_data, test_data, log_path=None):
		super(AntiDerivative, self).__init__()
		self.net = net
		self.train_data = train_data
		self.test_data = test_data
		self.log_path = log_path

	@tf.function
	def train_step(self, x_train, u_train):
		pflag = True # persistence of gradient tape
		with tf.GradientTape(persistent=pflag) as t3:
			u_hat = self.net(x_train)
			loss = tf.reduce_mean((u_train - u_hat)**2)
		gradients_branch = t3.gradient(loss, self.net.branch.trainable_weights)
		gradients_trunk = t3.gradient(loss, self.net.trunk.trainable_weights)
		self.optimizer_B.apply_gradients(zip(gradients_branch, self.net.branch.trainable_weights))
		self.optimizer_T.apply_gradients(zip(gradients_trunk, self.net.trunk.trainable_weights))

	def train(self, learning_rate=0.001, epochs=50000):
		# setup optimizers
		self.optimizer_B = keras.optimizers.Adam(learning_rate=learning_rate)
		self.optimizer_T = keras.optimizers.Adam(learning_rate=learning_rate)

		x_test = self.test_data["x_test"]
		u_test = self.test_data["u_test"]
		# randomly select test ids for plotting
		test_idx = np.random.randint(0,u_test.shape[0], size=(9,))

		# training
		for ep in range(epochs):
			x_train = self.train_data["x_train"]
			u_train = self.train_data["u_train"]
			self.train_step(x_train, u_train)
			if ep%1000 == 0:
				print("Epoch {}".format(ep))
				u_hat = self.net(x_test)

				# PLOTTING
				plt_num_row = 3
				plt_num_col = 3

				fig, axs = plt.subplots(plt_num_row, plt_num_col, figsize=(3*plt_num_col,3*plt_num_row),
							subplot_kw={'aspect': 'auto'}, sharex=True, sharey=True, squeeze=True)

				for j in range(plt_num_col):
					for i in range(plt_num_row):
						I = test_idx[j * plt_num_row + i]
						im = axs[i][j].plot(u_hat[I,:])
						im = axs[i][j].plot(u_test[I,:])
						axs[i][j].legend(["predicted-#"+str(I), "ground-truth-#"+str(I)])

				plot_path = os.path.join(self.log_path, "plots")
				if not(os.path.exists(plot_path)):
					os.makedirs(plot_path)

				plt.savefig(os.path.join(plot_path, 'contour_' + str(ep) + '.png'))
				plt.close('all')

In [None]:
load_path = "/content/deeponet_antiderivative_aligned"
work_path = "/content/"

In [None]:
# Load dataset
d = np.load(os.path.join("antiderivative_aligned_train.npz"), allow_pickle=True)
X_train = (d["X"][0].astype(np.float32), d["X"][1].astype(np.float32))
y_train = d["y"].astype(np.float32)
d = np.load(os.path.join("antiderivative_aligned_test.npz"), allow_pickle=True)
X_test = (d["X"][0].astype(np.float32), d["X"][1].astype(np.float32))
y_test = d["y"].astype(np.float32)

train_data = {"x_train":X_train, "u_train":y_train}
test_data = {"x_test":X_test, "u_test":y_test}

In [None]:
m = y_train.shape[1] # number of sensor points
dim_x = 1 # input dimension
deeponet = DeepONet([m, 40, 40], [dim_x, 40, 40], "tanh", "tanh")
model = AntiDerivative(deeponet, train_data, test_data, log_path=work_path)
model.train(learning_rate=0.001, epochs=5000)

Epoch 0
Epoch 1000
Epoch 2000
Epoch 3000
Epoch 4000
