In [599]:
import tensorflow.keras as keras
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import display, clear_output


# keras.utils.set_random_seed(253)

In [600]:
ALPHA = 1 # thermal diffusivity coefficient
T_STEP = .5
X_STEP = 1 # cant set as a small number? fix
SOURCE_TEMP = 100
BASE_TEMP = 0
LENGTH = 40
DURATION = 60

def initial_function(x: float)-> float: # assumes t = 0
    if x < .5 * LENGTH:
        return (x/LENGTH) * 2 * SOURCE_TEMP
    else:
        return SOURCE_TEMP - (((x - (.5*LENGTH))/LENGTH) * 2 * SOURCE_TEMP)
    
#  WIP
def heat_equation(T_x1, T_x2, T_x3)-> float:
    # T_x 1, 2, 3 = Temp at x-1, x, and x+1
    # temps are from previous time step
    return T_x2 + ((ALPHA * T_STEP) * ((T_x3 - (2 * T_x2) + T_x1) / X_STEP**2))

    
def heat_equation_boundary(t, x)-> float:
    if t == 0:
        return initial_function(x)
    if x == np.float32(LENGTH) or x == 0: # insulated ends
        return BASE_TEMP
    


In [601]:
t_set = np.arange(0, DURATION + T_STEP, T_STEP, dtype='float32')
x_set = np.arange(0, LENGTH + X_STEP, X_STEP, dtype='float32')
tx_set = []

x_initial = np.arange(0, LENGTH + X_STEP, X_STEP, dtype='float32')
t_initial = np.array([0 for item in x_initial], dtype='float32')
u_initial = np.array([initial_function(x) for x in x_initial], dtype='float32')
u_initial = tf.expand_dims(u_initial, axis=1)

# match paper
t_boundary = np.array([[item for item in t_set] + [item for item in t_set]], dtype='float32')
x_boundary = np.array([0 for item in t_set] + [LENGTH for item in t_set], dtype='float32')


for item in t_set:
    for item2 in x_set:
        tx_set.append(np.array([item, item2]))

tx_set = np.array(tx_set)



In [602]:
def make_network():
    layers = []         

    layers.append(keras.layers.Dense(16, activation = 'leaky_relu', input_shape = (None, 1, 2)))
    layers.append(keras.layers.Dropout(.5))
    layers.append(keras.layers.Dense(16, activation = 'leaky_relu'))
    layers.append(keras.layers.Dropout(.5))
    layers.append(keras.layers.Dense(16, activation = 'leaky_relu'))
    layers.append(keras.layers.Dropout(.5))
    layers.append(keras.layers.Dense(16, activation = 'leaky_relu'))
    layers.append(keras.layers.Dropout(.5))
    layers.append(keras.layers.Dense(1, activation = 'linear'))
    
    network = keras.Sequential(layers)


    return network


In [603]:
# residual is how accurate the output is with respect to target equation
def calc_residual(model: keras.Model, tx_list, alpha)-> tf.Tensor:

    with tf.GradientTape(persistent= True) as gt:

        t = tx_list[:, 0:1]
        x = tx_list[:, 1:2]
        t = tf.convert_to_tensor(t)
        x = tf.convert_to_tensor(x)

        t = tf.squeeze(t)
        x = tf.squeeze(x)

        gt.watch(t)
        gt.watch(x)

        u = model(tf.stack([t, x], axis=1))

        du_dx = gt.gradient(u, x)
        # print("u_x: " + str(du_dx))
        # u = tf.convert_to_tensor(tf.squeeze(u), dtype='float32')

    du_dt = gt.gradient(u, t)
    # print("u_t: " + str(du_dt))
    

    d2u_dx2 = gt.gradient(du_dx, x) # second derivative of predicted values
    # print("u_x2: " + str(d2u_dx2))
    del gt

    return du_dt - (alpha * d2u_dx2)


In [604]:
# calculate how accurate the model is with respect to the actual answers
def calc_loss(model: keras.Model, tx_list, alpha):
    """
    Custom loss function made for neural network. It calculates the residual, and then
    adds it to the difference in the boundary outputs and predicted values.
    Returns the loss.
    
    """

    residual = calc_residual(model, tx_list, alpha)  
    av_residual = tf.reduce_mean(tf.square(residual)) 


    # print(tf.stack([x_initial, t_initial], axis=1))
    # print(tf.stack([tf.squeeze(x_boundary), tf.squeeze(t_boundary)], axis=1))
    

    # at beginning, at t = 0, heat should be a gradient from BASE_TEMP at 0, SOURCE_TEMP in the middle, then back down to BASE_TEMP at the end
    u_i_pred = model(tf.stack([t_initial, x_initial], axis=1))

    u_i_diff = u_initial - u_i_pred

    # print(tf.stack([t_initial, x_initial], axis=1))
    # print(u_i_pred)
    # print(u_initial)
    # print(u_i_diff)

    # at any point in time, derivative of x = 0 and x = LENGTH should be 0, as the ends are insulated and heat should not enter or exit
    with tf.GradientTape() as gt:
        x_b = tf.convert_to_tensor(x_boundary)
        gt.watch(x_b)
        u_b_pred = model(tf.stack([tf.squeeze(t_boundary), tf.squeeze(x_b)], axis=1))

    du_dx_pred = gt.gradient(u_b_pred, x_b) # derivative should equal 0, insulated ends should not change temp

    # print(tf.reduce_mean(tf.square(u_i_diff)))

    # print("av residual: " + str(av_residual))
    # print("Initial condition difference: " + str(u_i_diff))
    # print("Boundary Condition diff: " + str(du_dx_pred))

    return av_residual + tf.reduce_mean(tf.square(u_i_diff)) + tf.reduce_mean(tf.square(du_dx_pred))

In [605]:
# gradient is of the loss function with respect to all the variables in the model
def calc_gradient(model: keras.Model, tx_list, alpha):
    """
    Calculates the gradient to apply to the neural network. Returns the
    gradient.
    
    """

    with tf.GradientTape() as gt:
        gt.watch(model.trainable_variables) # weights
        loss = calc_loss(model, tx_list, alpha)

    gradient = gt.gradient(loss, model.trainable_variables)

    # print(str(gradient))

    return loss, gradient

In [606]:
def train(model: keras.Model, tx_list, alpha, optimizer: keras.optimizers.Optimizer):
    """
    Executes one training step for the network. Calculates loss and gradient, applies the 
    gradient, then returns the loss.
    
    """
    
    loss, gradient = calc_gradient(model, tx_list, alpha)

    optimizer.apply_gradients(zip(gradient, model.trainable_variables))

    return loss

In [607]:
# train the model
epochs = 5000
i = 0
network = make_network()
# network.load_weights("1D_heat_equation_network.h5")

while i < epochs:
    
    loss = 0
    loss = train(network, tx_set, ALPHA, keras.optimizers.Adam())


    print("Epoch: " + str(i))
    print("Loss: " + str(loss))
    print("\n")
    # save every epoch so i can jump ship at any time and still have a network to show for it
    if i % 10 == 0:
        network.save("1D_heat_equation_network.h5")
    i += 1

network.save("1D_heat_equation_network.h5")

Epoch: 0
Loss: tf.Tensor(40910.99, shape=(), dtype=float32)


Epoch: 1
Loss: tf.Tensor(37662.863, shape=(), dtype=float32)


Epoch: 2
Loss: tf.Tensor(40910.99, shape=(), dtype=float32)


Epoch: 3
Loss: tf.Tensor(37662.863, shape=(), dtype=float32)


Epoch: 4
Loss: tf.Tensor(40910.99, shape=(), dtype=float32)


Epoch: 5
Loss: tf.Tensor(37662.863, shape=(), dtype=float32)


Epoch: 6
Loss: tf.Tensor(40910.99, shape=(), dtype=float32)


Epoch: 7
Loss: tf.Tensor(37662.863, shape=(), dtype=float32)


Epoch: 8
Loss: tf.Tensor(40910.99, shape=(), dtype=float32)


Epoch: 9
Loss: tf.Tensor(37662.863, shape=(), dtype=float32)


Epoch: 10
Loss: tf.Tensor(40910.99, shape=(), dtype=float32)


Epoch: 11
Loss: tf.Tensor(37662.863, shape=(), dtype=float32)


Epoch: 12
Loss: tf.Tensor(40910.99, shape=(), dtype=float32)


Epoch: 13
Loss: tf.Tensor(37662.863, shape=(), dtype=float32)


Epoch: 14
Loss: tf.Tensor(40910.99, shape=(), dtype=float32)


Epoch: 15
Loss: tf.Tensor(37662.863, shape=(), dtype=float

PermissionError: [Errno 13] Unable to synchronously create file (unable to open file: name = '1D_heat_equation_network.h5', errno = 13, error message = 'Permission denied', flags = 13, o_flags = 302)

In [608]:
actual_temps = []


for number in range(len(t_set)):

    if not number == 0:
        # normally I love list comprehensions, but WOW this is garbage
        actual_temps.append(np.array([heat_equation_boundary(t_set[number], x_set[i]) if i == 0 or i == len(x_set) - 1 else heat_equation(actual_temps[number-1][i-1], actual_temps[number-1][i], actual_temps[number-1][i+1]) for i in range(len(x_set))]))

    else: 
        actual_temps.append(np.array([heat_equation_boundary(t_set[number], x) for x in x_set]))
    

actual_temps = tf.convert_to_tensor(actual_temps, dtype='float32')

if True:
    network = make_network()
    network.load_weights("1D_heat_equation_network.h5")
    predictions = []

    x = tf.convert_to_tensor(x_set, dtype='float32')

    for number in t_set:

        time = np.array([number for item in x_set])

        time = tf.convert_to_tensor(time, dtype='float32')
        
        y = network(tf.stack([time, x], axis=1))
        predictions.append(y)



plt.ion()

# plt.xlabel("Position")
# plt.ylabel("Temp")
# plt.axis((-.2, 6, 0, 101))

# for item in predictions:
#     color = np.where(item < 20, 'k', np.where(item < 40, 'b', np.where(item < 60, 'g', np.where(item < 80, 'y', 'r'))))
#     plt.scatter(x, item, s=5, c=np.squeeze(color), linewidth=0)
    
#     display(plt.scatter(x, item, s=5, c=np.squeeze(color), linewidth=0))
#     clear_output(wait=True)
#     plt.pause(.1)

figure = plt.figure()
sub1 = figure.add_subplot(111)
sub1.set_xlabel("Position")
sub1.set_ylabel("Temp")
sub1.axis((-1, LENGTH + 5, 0, 101))

# line1, = sub1.plot(x, predictions[0], "r--")

for number in range(len(actual_temps)):
    color1 = np.where(actual_temps[number] < 20, 'k', np.where(actual_temps[number] < 40, 'b', np.where(actual_temps[number]< 60, 'g', np.where(actual_temps[number] < 80, 'y', 'r'))))    
    sub1.plot(x_set, actual_temps[number], 'b-', linewidth=0.5, zorder=0)
    sub1.scatter(x_set, actual_temps[number], s=5, c=np.squeeze(color1), zorder=1)

    if True:
        color2 = np.where(predictions[number] < 20, 'k', np.where(predictions[number] < 40, 'b', np.where(predictions[number]< 60, 'g', np.where(predictions[number] < 80, 'y', 'r'))))
        sub1.scatter(x_set, predictions[number], s=5, c=np.squeeze(color2), zorder=1)
        sub1.plot(x_set, predictions[number], 'r--', linewidth=0.5, zorder=0)

    
    sub1.axis((-1, LENGTH + 5, 0, 101))
    display(figure)
    clear_output(wait=True)
    plt.pause(.1)
    sub1.clear()

    

KeyboardInterrupt: 