In [1]:
# taken from https://georgemilosh.github.io/blog/2022/distill/
# check: https://stackoverflow.com/questions/64995683/loss-function-with-derivative-in-tensorflow-2
# NOTE - x_train[:i] - all samples in TS up to i , not the i'th sample !!
# u(x,t)
# Burgers eq.: u_t+uu_x-(0.001)u_xx = 0
# BC         : u(x,0) = sin(x)
# IC         : u(-1,t) = i(1,t) = 0 

In [2]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.losses import Loss
print("TensorFlow version:", tf.__version__)

TensorFlow version: 2.19.0


In [3]:
def boundery(x):
    return [np.sin(x)]      

def construct_training_set_BC(dx):
    x = np.pi*np.arange(-1, 1.1 , 0.1, dtype = np.float32)
    zeros = np.zeros(np.shape(x))
    y_train_BC = np.array(boundery(x))[0]
    x_train_BC = np.vstack((x, zeros)).T
    #print ('x_train_BC type = ',type(x_train_BC))
    #print ('y_train_BC type = ',type(y_train_BC))
    #print ('x_train_BC = ',x_train_BC)
    #print ('y_train_BC = ',y_train_BC)
    return x_train_BC,y_train_BC

def construct_training_set_IC(dt):
    t =  (np.arange(0, 1.0001 , dt, dtype = np.float32))
    Nt = np.shape(t)
    x1 = np.ones(Nt)
    xminus1 = x1-2.0
    x1_t = np.vstack((x1, t)).T.tolist()
    x_minus1_t = np.vstack((xminus1, t)).T.tolist()
    x_train_IC = np.array( x_minus1_t+x1_t)
    y_train_IC = np.zeros(np.shape(x_train_IC)[0])
    #print ('x_train_IC = ',x_train_IC)
    #print ('y_train_IC = ',y_train_IC)
    #print (np.shape(x_train_IC)[0])
    return x_train_IC,y_train_IC
            
def construct_training_set(dx,dt):
    x_train_BC,y_train_BC = construct_training_set_BC(dx)
    x_train_IC,y_train_IC = construct_training_set_IC(dt)    
    x_train =  np.array(x_train_BC.tolist() + x_train_IC.tolist())
    y_train =  np.array(y_train_BC.tolist() + y_train_IC.tolist())
    #print ('x_train = ',x_train)    
    #print ('y_train = ',y_train)
    return x_train,y_train

In [4]:
def init_model(num_hidden_layers=1, num_neurons_per_layer=2):
    model = tf.keras.Sequential()
    model.add(tf.keras.Input(shape=(2,))) 
    # Append hidden layers
    for _ in range(num_hidden_layers):
        model.add(tf.keras.layers.Dense(num_neurons_per_layer,
            activation=tf.keras.activations.exponential,
            kernel_initializer='glorot_normal'))
    # Output is one-dimensional
    model.add(tf.keras.layers.Dense(1))
    #print (model.get_layer(index=1))
    return model

def set_weights_for_specific_model_layer(model):
    layer_weights_shape = [w.shape for w in model.get_weights()]
    #print(f"layers weight shapes: {layer_weights_shape}")
    custom_kernel = np.ones(layer_weights_shape[0]) 
    custom_bias = np.zeros(layer_weights_shape[1])
    model.set_weights([custom_kernel, custom_bias]) 
    
def set_weights_as_one_for_model(model):
    current_weights = model.get_weights()
    new_weights = [np.ones_like(w) for w in current_weights]
    model.set_weights(new_weights)  

def set_internal_variables_for_model(model,m,b):
    tm = tf.convert_to_tensor(m)
    tb = tf.convert_to_tensor(b)
    first_layer  = model.layers[0]
    secend_layer = model.layers[1]
    set_weights_for_specific_model_layer(first_layer)
    set_weights_for_specific_model_layer(secend_layer)
    dense_layer = model.get_layer(index=0)
    dense_layer.set_weights([tm,tb])
    #print (' network trainable_variables  = ')
    #print( model.trainable_variables)
    #print ('')
    #print (' W1  = ')
    #print( model.trainable_variables[0].numpy())
    #print (' b1 = ')
    #print( model.trainable_variables[1].numpy())
    #print (' W2 = ')
    #print( model.trainable_variables[2].numpy())
    #print (' b2 = ')
    #print( model.trainable_variables[3].numpy())

def excat_solution(model,r):
    w11=model.trainable_variables[0][0][0].numpy()
    w12=model.trainable_variables[0][0][1].numpy()
    w21=model.trainable_variables[0][1][0].numpy()
    w22=model.trainable_variables[0][1][1].numpy()
    w13=model.trainable_variables[2][0].numpy()
    w23=model.trainable_variables[2][1].numpy()
    x = r.numpy()[0][0]
    y = r.numpy()[0][1]
    # print ('w11 w12 = ',w11,w12)
    # print ('w21,w22 = ',w21,w22)    
    # print ('w13,w23 = ',w13,w23)
    # print ('x, y = ',x,y)
    du_dx_dx = w13*w11**2*np.exp(w11*x + w21*y) + w23*w12**2*np.exp(w12*x + w22*y) 
    du_dy_dy = w13*w21**2*np.exp(w11*x + w21*y) + w23*w22**2*np.exp(w12*x + w22*y)    
    print ('exact du_dx_dx, du_dy_dy = ',du_dx_dx,du_dy_dy)
    return (du_dx_dx,du_dy_dy)


In [22]:
loss_fn = tf.keras.losses.MeanSquaredError(reduction='sum_over_batch_size',name='mean_squared_error')

def custom_mse_loss(y_true, y_pred):
    l_BC_IC = loss_fn(y_true,y_pred)
    #squared_difference = tf.square(y_true - y_pred) 
    #return tf.reduce_mean(squared_difference, axis=-1)
    return l_BC_IC

class Custom_PINN_Loss(Loss):
    def __init__(self,model,x , name='Custom_PINN_Loss',**kwargs):
        super().__init__(name=name)
        self.x =x
        self.model=model
        
    def call(self, y_true, y_pred):
        l1 = loss_fn(y_true, y_pred)
        # x = tf.constant([[1,2]], dtype=tf.float32)
        with tf.GradientTape(persistent=True) as tape2:
            tape2.watch(self.x)  # "The input x must be a tf.Variable or explicitly "watched" using tape.watch(x) to be tracked by GradientTape "    
            with tf.GradientTape(persistent=True) as tape1:
                tape1.watch(self.x)
                u = model(self.x)
                print('u = ',u)
                grad_u = tape1.gradient(u, self.x)        
            hessian_u = tape2.jacobian(grad_u, self.x)    
        ux = grad_u[0][0]
        ut = grad_u[0][1]
        uxx = hessian_u[0][0][0][0]
        #print ('gradient: ',grad_u)
        #print ('hessian',hessian_u)
        #print ('ux',ux)
        #print ('ut',ut)
        #print ('u',u)  
        l2 = ut + u*ux - (0.001)*uxx
        #print ('loss2 = ',loss2)       
        return l1+l2
       
    # Optional: Implement get_config to save and load the model with custom parameters
    def get_config(self):
        base_config = super().get_config()
        base_config['threshold'] = self.threshold
        return base_config


In [6]:

#                                                     M    A     I     N


In [11]:
x_train, y_train = construct_training_set(0.1,0.1)

In [12]:
model = init_model()
#model.summary()
m = [[1,2],[3,4]]
b = [0,0]
set_internal_variables_for_model(model,m,b)
x1 = tf.constant([[1,2]], dtype=tf.float32)
#excat_solution(model,x1)


In [13]:
sample_number = 4
predictions = model(x_train[:sample_number]).numpy()
#print (predictions)

In [24]:
#print ('custom_mse_loss = ',custom_mse_loss(y_train[:sample_number],predictions).numpy())
#print ('custom_mse_loss = ',Custom_PINN_Loss(y_train[:sample_number],predictions,threshold=1.3,x=x1).numpy())
CML = Custom_PINN_Loss(model,x1,model)
print (CML.call(y_train[:sample_number],predictions).numpy())

u =  tf.Tensor([[21998.096]], shape=(1, 1), dtype=float32)
[[9.401472e+08]]


In [21]:
#model.compile(optimizer='adam',loss=custom_mse_loss,metrics=['accuracy']) #  loss=custom_mse_loss
model.compile(optimizer='adam', loss=Custom_PINN_Loss(model,x1))
model.fit(x_train, y_train, epochs=5)

Epoch 1/5
u =  Tensor("compile_loss/Custom_PINN_Loss/sequential_1/dense_1_2/Add:0", shape=(1, 1), dtype=float32)
gradient:  Tensor("gradient_tape/compile_loss/Custom_PINN_Loss/sequential_1/dense_1/MatMul/MatMul:0", shape=(1, 2), dtype=float32)
hessian Tensor("compile_loss/Custom_PINN_Loss/Reshape_1:0", shape=(1, 2, 1, 2), dtype=float32)
u =  Tensor("compile_loss/Custom_PINN_Loss/sequential_1/dense_1_2/Add:0", shape=(1, 1), dtype=float32)
gradient:  Tensor("gradient_tape/compile_loss/Custom_PINN_Loss/sequential_1/dense_1/MatMul/MatMul:0", shape=(1, 2), dtype=float32)
hessian Tensor("compile_loss/Custom_PINN_Loss/Reshape_1:0", shape=(1, 2, 1, 2), dtype=float32)
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 63ms/step - loss: 1042248192.0000
Epoch 2/5
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 51ms/step - loss: 1020562880.0000
Epoch 3/5
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 50ms/step - loss: 999340288.0000 
Epoch 4/5
[1m2/2[0m [

<keras.src.callbacks.history.History at 0x19f40162690>