In [1]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from tensorflow.keras.layers import RNN, Dense, Layer
from tensorflow.keras import Sequential
from tensorflow.keras.optimizers import RMSprop
from tensorflow.python.framework import tensor_shape
from tensorflow import float32, concat, convert_to_tensor, linalg, random

2023-06-16 14:03:31.883255: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-06-16 14:03:31.957765: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.


In [13]:
class EMIntegratorCell(Layer):
    def __init__(self, m, damping, dt, initial_state, **kwargs):
        super(EMIntegratorCell, self).__init__(**kwargs)
        self.m = m
        self.damping = damping
        self.initial_state = initial_state
        self.state_size = 2 * len(m)
        self.dt = dt

    def call(self, inputs, states):
        t = inputs
        y = states[0][:, :1]
        ydot = states[0][:, 1:]
        yddoti = self._fun(self.m, self.damping, y, ydot)
        y_new = y + ydot * self.dt
        ydot_new = ydot + yddoti
        return y_new, [concat([y_new, ydot_new], axis=1)]

    def _fun(self, m, damping, y, ydot):
        random_force = random.normal(shape=ydot.shape, dtype=float32) * np.sqrt(self.dt)
        return -damping * ydot / m + random_force

    def get_initial_state(self, inputs=None, batch_size=None, dtype=None):
        return self.initial_state

In [14]:
def create_model(m, damping, dt, initial_state, batch_input_shape, return_sequences=True, unroll=False):
    emCell = EMIntegratorCell(m=m, damping=damping, dt=dt, initial_state=initial_state)
    PINN = RNN(cell=emCell, batch_input_shape=batch_input_shape, return_sequences=return_sequences, return_state=False, unroll=unroll)
    model = Sequential()
    model.add(PINN)
    model.compile(loss='mse', optimizer=RMSprop(1e4), metrics=['mae'])
    return model

In [15]:
if __name__ == "__main__":
    # mass and damping coefficient
    m = np.array([20.0], dtype='float32')
    damping = np.array([10.0], dtype='float32')

    # data
    df = pd.read_csv('./langevin_data.csv')
    t  = df[['t']].values
    dt = (t[1] - t[0])[0]
    xtrain = df[['t']].values
    ytrain = df[['v']].values

    # initial velocity
    initial_velocity = np.array([ytrain[0][0]], dtype='float32')

    # fitting physics-informed neural network
    initial_state = np.array([[initial_velocity]], dtype='float32')
    xtrain_reshaped = np.reshape(xtrain, (-1, 1, 1))
    ytrain_reshaped = np.reshape(ytrain, (-1, 1, 1))
    model = create_model(m, damping, dt, initial_state=initial_state, batch_input_shape=(None, 1, 1))
    yPred_before = model.predict(xtrain_reshaped)
    model.fit(xtrain_reshaped, ytrain_reshaped, epochs=10, steps_per_epoch=1, verbose=1)
    yPred = model.predict(xtrain_reshaped)

    # plotting prediction results
    plt.plot(xtrain, ytrain[:, 0], 'gray')
    plt.plot(xtrain, yPred_before[0, :, 0], 'r', label='before training')
    plt.plot(xtrain, yPred[0, :, 0], 'b', label='after training')
    plt.xlabel('t')
    plt.ylabel('v')
    plt.grid('on')
    plt.legend()
    plt.show()

ValueError: Exception encountered when calling layer "rnn_5" (type RNN).

Dimension 1 in both shapes must be equal, but are 0 and 1. Shapes are [1,0,1] and [1,1,1].

Call arguments received by layer "rnn_5" (type RNN):
  • inputs=tf.Tensor(shape=(None, 1, 1), dtype=float32)
  • mask=None
  • training=None
  • initial_state=None
  • constants=None