## Import Libraries

In [35]:
import numpy as np
from scipy.integrate import odeint
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import cv2 
from PIL import Image
from keras import layers,Model
from keras.models import Sequential
from keras.layers import Conv2D,MaxPooling2D,Dense,GlobalMaxPool2D,Conv2DTranspose,UpSampling2D
from keras.layers import Flatten,Dropout
from keras.optimizers import SGD , Adam
from keras.layers import BatchNormalization
from keras.layers import LSTM,SimpleRNN,GRU
from keras.layers import TimeDistributed

## Functions

In [None]:
def pendulum_equation(y0,t,m,l,g):
  theta , x = y0
  f = [x, -(g/l)*np.sin(theta)]
  return f 

def plot_loss(history,title):
  plt.plot(history.history['loss'],label='loss')
  plt.plot(history.history['val_loss'],label='val_loss')
  plt.xlabel('Epoch')
  plt.ylabel('Cost')
  plt.title(title)
  plt.legend()
  plt.grid()


def animationpred(n,nof,model,x_test):
  i = 0 
  output = []
  m = 0
  # xtestlist = xtestnew[0]
  xtestlist = x_test[0]
  xtestlist=xtestlist.reshape(1,10,80,80,3)
  plt.imshow(xtestlist[0][-1])
  ypred = model.predict(xtestlist)
  fig = plt.figure()


  while i<=nof :
    im = plt.imshow(ypred[0][-1],animated = True)
    output.append([im])
    xtestlist = np.append(xtestlist[0],np.array([(ypred[0][-1])]),axis=0)
    xtestlist = xtestlist.reshape(1,11,80,80,3)
    xtestlist = np.delete(xtestlist[0],0,axis=0)
    xtestlist = xtestlist.reshape(1,10,80,80,3)
    ypred = model.predict(xtestlist)
    i += 1 
  
  ani = animation.ArtistAnimation(fig, output, interval=50, blit=True)
  if n == 1 :
    ani.save('./Results/RNNFinal/RNNanimation{}.mp4'.format(nof),writer='ffmpeg' ,fps=20)
  elif n==2 :
    ani.save('./Results/GRUFinal/GRUanimation{}.mp4'.format(nof),writer='ffmpeg' ,fps=20)
  elif n==3 : 
    ani.save('./Results/LSTMFinal/LSTManimation{}.mp4'.format(nof),writer='ffmpeg' ,fps=20)

##  Generating Moving Pendulum animation 
This animation is generated based on the governing differential equation

In [36]:
m = 1 # pendulum mass 
l = 1 # rod length 
g = 9.8 
y0 = [np.pi/2 , 0.5] # Initial state
t = np.linspace(0,6,600)
ans = odeint(pendulum_equation,y0,t,args=(m,l,g))

fig ,ax = plt.subplots()
mylist =[]
for i in range(len(ans)):
  line, = ax.plot([0,np.sin(ans[i,0])],[0,-np.cos(ans[i,0])],color='black',lw=4 )
  ball, = ax.plot(np.sin(ans[i,0]),-np.cos(ans[i,0]),'o',markersize =35,color='blue')
  time = ax.text(-0.75,0,"Time is:"+"{:.2f}".format(t[i]))
  mylist.append([line,ball,time])
ax.set_aspect('equal','datalim')
result = animation.ArtistAnimation(fig,mylist,interval=50)
result.save('animation.mp4',writer='ffmpeg' ,fps=20)
plt.close(fig) 

## Generating training pictures 
Extract screenshots of the animation 

In [37]:
myvideo = cv2.VideoCapture("./animation.mp4")
framenumber = 0
while(True):
    cap,frame = myvideo.read()
    if cap :
        cv2.imwrite('./PendulumData/frame'+str(framenumber)+'.jpg',frame)
        framenumber += 1 
    else :
        break
myvideo.release()
cv2.destroyAllWindows()

## Training data pre-processing

In [48]:
# Load and preprocess images
def load_and_preprocess_images(path, num_images, size=(80, 80)):
    data = []
    for i in range(num_images):
        image = Image.open(f'{path}/frame{i}.jpg')
        image = image.resize(size)
        image = np.asarray(image) / 255.0
        data.append(image)
    return np.array(data)

# Prepare training and testing data
def prepare_data(data, split_index):
    train_data = data[:split_index]
    test_data = data[split_index:]

    x_train = train_data[:-1]
    y_train = train_data[1:]

    test_data = test_data[:-8]
    x_test = test_data[:-1]
    y_test = test_data[1:]

    return x_train, y_train, x_test, y_test

# Reshape data for training, validation, and testing
def reshape_data(x_data, y_data, sequence_length):
    x_new = []
    y_new = []
    for i in range(len(x_data) - (sequence_length)):
        x_new.append(x_data[i:i + sequence_length])
        y_new.append(y_data[i:i + sequence_length][-1])
    x_new = np.array(x_new)
    y_new = np.array(y_new).reshape((len(x_data) - (sequence_length)),1,*y_data.shape[1:])

    return x_new, y_new

# Main processing
data_path = './PendulumData'
num_images = 600
split_index = 551
sequence_length = 10

data = load_and_preprocess_images(data_path, num_images)

x_train, y_train, x_test, y_test = prepare_data(data, split_index)
x_train_final, y_train_final = reshape_data(x_train, y_train, sequence_length)
x_test_final, y_test_final = reshape_data(x_test, y_test, sequence_length)

print(f"x_train_new shape: {x_train_final.shape}")
print(f"y_train_new shape: {y_train_final.shape}")
print(f"x_test_new shape: {x_test_final.shape}")
print(f"y_test_new shape: {y_test_final.shape}")

x_train_new shape: (540, 10, 80, 80, 3)
y_train_new shape: (540, 1, 80, 80, 3)
x_test_new shape: (30, 10, 80, 80, 3)
y_test_new shape: (30, 1, 80, 80, 3)


## Deep Neural Network Architecture
Encoder / LSTM / Decoder

In [None]:
def finalmodel(n,opt,lfunc):
  model = Sequential()
  ##Encoder Part
  model.add(TimeDistributed(Conv2D(27, (3, 3), activation='relu',kernel_initializer='he_uniform',padding='same',input_shape=(10,80,80,3))))
  model.add(TimeDistributed(MaxPooling2D((2,2))))
  model.add(TimeDistributed(Conv2D(18, (3, 3), activation='relu',kernel_initializer='he_uniform',padding='same')))
  model.add(TimeDistributed(MaxPooling2D((2,2))))
  model.add(TimeDistributed(Conv2D(9, (3, 3), activation='relu',kernel_initializer='he_uniform',padding='same')))
  model.add(TimeDistributed(MaxPooling2D((2,2))))
  model.add(TimeDistributed(Conv2D(4, (3, 3), activation='relu',kernel_initializer='he_uniform',padding='same')))
  model.add(TimeDistributed(Flatten()))
  ###
  ##LSTM Part
  if n == 1 :
    model.add((SimpleRNN(500,activation='tanh',kernel_initializer='he_uniform',return_sequences=False)))
    model.add(layers.Reshape((1,10,10,5)))
  elif n==2 :
    model.add((GRU(500,activation='tanh',kernel_initializer='he_uniform',return_sequences=False)))
    model.add(layers.Reshape((1,10,10,5)))
  elif n==3 : 
    model.add((LSTM(500,activation='tanh',kernel_initializer='he_uniform',return_sequences=False)))
    model.add(layers.Reshape((1,10,10,5)))
  ##
  ##Decoder part
  model.add(TimeDistributed(Conv2D(4, (3, 3), activation='relu',kernel_initializer='he_uniform',  padding='same')))
  model.add(TimeDistributed(UpSampling2D((2,2))))
  model.add(TimeDistributed(Conv2D(9, (3, 3), activation='relu',kernel_initializer='he_uniform',  padding='same')))
  model.add(TimeDistributed(UpSampling2D((2,2))))
  model.add(TimeDistributed(Conv2D(18, (3, 3), activation='relu',kernel_initializer='he_uniform',  padding='same')))
  model.add(TimeDistributed(UpSampling2D((2,2))))
  model.add(TimeDistributed(Conv2D(27, (3, 3), activation='relu',kernel_initializer='he_uniform',  padding='same')))
  model.add(TimeDistributed(Conv2D(3, (3, 3), activation='relu',kernel_initializer='he_uniform', padding='same')))
  model.compile(optimizer = opt , loss = lfunc)
  return model


## LSTM

In [None]:
# Training
lstmmodel = finalmodel(3,'adam','mse')
lstmhistory = lstmmodel.fit(x_train_final,y_train_final,epochs = 800, batch_size = 10,validation_split = 0.2 ,verbose = None)
lstmmodel.summary()
lstmmodel.save_weights('./Results/Weights/lstmmodel.h5')
# Evaluation
lstmmodel.evaluate(x_test_final,y_test_final)
# Predicting next N frames
noflist = [5,10,20,50,100]
for item in noflist:
  animationpred(3,item,lstmmodel,x_test_final)

## GRU

In [None]:
grumodel = finalmodel(2,'adam','mse')
gruhistory = grumodel.fit(x_train_final,y_train_final,epochs = 800, batch_size = 10,validation_split = 0.2 ,verbose = 1)
grumodel.summary()
grumodel.evaluate(x_test_final,y_test_final)
grumodel.save_weights('./Results/Weights/grumodel.h5')
for item in noflist:
  animationpred(2,item,lstmmodel,x_test_final)

## RNN

In [None]:
rnnmodel = finalmodel(1,'adam','mse')
rnnhistory = rnnmodel.fit(x_train_final,y_train_final,epochs = 800, batch_size = 10,validation_split = 0.2 ,verbose = 1)
rnnmodel.summary()
rnnmodel.evaluate(x_test_final,y_test_final)
rnnmodel.save_weights('./Results/Weights/rnnmodel.h5')
for item in noflist:
  animationpred(1,item,lstmmodel,x_test_final)

## Loss function
This section studies effect of various loss functions on the performance of DNN

In [None]:
lossfunctions = ['mae','mse','huber','categorical_crossentropy']
for item in lossfunctions:
    lossmodel = finalmodel(3,'adam',item)
    losshistory = lossmodel.fit(x_train_final,y_train_final,epochs = 800, batch_size = 10,validation_split = 0.2 ,verbose = None)
    plot_loss(losshistory,item)

## Optimizer 
This section studies effect of various loss functions on the performance of DNN

In [None]:
Optimizers = ["RMSProp"]
for item in Optimizers:
    optimizermodel = finalmodel(3,item,'mse')
    optimizerhistory = lossmodel.fit(x_train_final,y_train_final,epochs = 800, batch_size = 10,validation_split = 0.2 ,verbose = None)
    plot_loss(optimizerhistory,item)