<a href="https://colab.research.google.com/github/tuphr2234/ChaosPrediction/blob/main/Chaotic_Movement_Simulation_and_Prediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Anvil Setup

Install Anvil uplink connection

In [None]:
!pip install anvil-uplink

Connect Colab with the Anvil project

In [41]:
import anvil.server
anvil.server.connect("M2SHTQKH2FUIJQQXXX4VKXEB-I37SIP4DTE6Z5NM3") # <----- [insert your anvil uplink here]

## Simulator

In [None]:
!pip install pandas

### N-Body Problem

#### The Body Class

In [43]:
import math
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import tensorflow as tf
from pathlib import Path

G=1 # gravitational force
@anvil.server.callable
class Body:
  # variables
  prevPosition = np.array([0,0])
  position = np.array([0,0])
  velocity = np.array([0,0])
  force = np.array([0,0])
  acceleration = np.array([0,0])
  mass = 0

  # constructor
  def __init__(self,position=[0,0],velocity=[0,0],mass=1,name="newBody"):
    self.position=np.array(position)
    self.velocity=np.array(velocity)
    #self.acceleration=np.array(acceleration)
    self.mass=mass
    self.prevPosition=np.array(position)
    self.name = name

  # set initial position
  def setInitialPostion(self,position):
    self.postion = np.array(position)
    self.prevPosition=np.array(position)

  # set initial velocity
  def setInitialVelocity(self,velocity):
    self.velocity = np.array(velocity)

  # set initial mass
  def setInitialMass(self,mass):
    self.mass = mass

  # update position
  def setPosition(self,t):
    self.prevPosition = self.position
    self.position=self.position+self.velocity*t+(self.acceleration/2)* t**2
  
  # update velocity
  def setVelocity(self,t):
    self.velocity=self.velocity+self.acceleration*t
  
  # get force
  def getForce(self,body):
    softening = 0.01
    Distance = body.prevPosition - self.position
    Rsquare = Distance[0]**2 + Distance[1]**2 #+ softening
  # F = G* self.mass*body.mass / Rsquare  
    F = (G* self.mass*body.mass / Rsquare) - 0.001 / (Rsquare * Rsquare) ### Lennard-Jones like force
    normalizeDis = Distance / math.sqrt(Rsquare)
    F = F*normalizeDis
  # if math.sqrt(Rsquare) < softening:
    #  F = -F 
    return F

  # update acceleration
  def setAcceleration(self,bodies):
    F=np.array([0,0])
    for body in bodies:
      F=F+self.getForce(body)
    self.acceleration = F/self.mass
   # print(f" {self.name} acceleration is {self.acceleration}")

#### Aditional Body functions

calculate the center of mass of the generated bodies

In [44]:
# return the center of mass of the bodies
def centerOfMass(bodies):
  out =np.array([0,0])
  for i in range(0,len(bodies),2):
    out = out + np.array(bodies[i],bodies[i+1])
  return out / (len(bodies)//2) 

In [45]:
# initialize bodies
def initialize_bodies(bodylist):
  bodies=[]
  for i,body in enumerate(bodylist):
    bodies.append(Body(body[0],body[1],1,"body"+str(i+1)))
  return bodies

### Double Pendulum

#### The Double Pendulum Class

In [46]:
from numpy import sin, cos
import numpy as np
import matplotlib.pyplot as plt
from collections import deque

G_dp = 9.8  # acceleration due to gravity, in m/s^2

class double_pendulum:
  # variables
  length = np.array([0,0])
  total_length = 0
  mass = np.array([0,0])
  angle = np.array([0,0])
  angular_velocity = np.array([0,0])
  state = np.radians([0, 0, 0, 0])

  # constructor
  def __init__(self,length=[1,1],mass=[1,1],angle=[120.0,-10.0],angular_velocity=[0,0],name="newDoublePendulum"):
    self.length=np.array(length)
    self.total_length = length[0] + length[1]
    self.mass=np.array(mass)
    self.angle=np.array(angle)
    self.angular_velocity=np.array(angular_velocity)
    self.state = np.radians([angle[0],angular_velocity[0],angle[1],angular_velocity[1]])
    self.name = name

  def get_pendulum(self,pendulum_id):
    return [self.length[pendulum_id],self.mass[pendulum_id],self.angle[pendulum_id],self.angular_velocity[pendulum_id]]

#### Aditional pendulum functions

In [47]:
# calculate derivatives
def derivs(state,L1,M1,L2,M2):
  
  dydx = np.zeros_like(state)

  dydx[0] = state[1]

  delta_deg = state[2] - state[0]
  den1 = (M1+M2) * L1 - M2 * L1 * cos(delta_deg) * cos(delta_deg)
  dydx[1] = ((M2 * L1 * state[1] * state[1] * sin(delta_deg) * cos(delta_deg)
              + M2 * G_dp * sin(state[2]) * cos(delta_deg)
              + M2 * L2 * state[3] * state[3] * sin(delta_deg)
              - (M1+M2) * G_dp * sin(state[0]))
              / den1)

  dydx[2] = state[3]

  den2 = (L2/L1) * den1
  dydx[3] = ((- M2 * L2 * state[3] * state[3] * sin(delta_deg) * cos(delta_deg)
              + (M1+M2) * G_dp * sin(state[0]) * cos(delta_deg)
              - (M1+M2) * L1 * state[1] * state[1] * sin(delta_deg)
              - (M1+M2) * G_dp * sin(state[2]))
              / den2)

  return dydx

In [48]:
# integrate the Ordinary Differential Equation (ODE) using Euler's method
def integrate_ODE(timesteps,dt,pendulum):
  inner_pendulum = pendulum.get_pendulum(0)
  outer_pendulum = pendulum.get_pendulum(1)
  L1 = inner_pendulum[0]
  M1 = inner_pendulum[1]
  L2 = outer_pendulum[0]
  M2 = outer_pendulum[1]

  y = np.empty((len(timesteps), 4))
  y[0] = pendulum.state
  for i in range(1, len(timesteps)):
      y[i] = y[i - 1] + derivs(y[i - 1],L1,M1,L2,M2) * dt
  return y

### General simulation functions

#### Calculate Simulation

In [49]:
# calculate body simulation
def simulate_on_time_step(bodies,t,steps,interval_in_data): # bodies: list of bodies. t: timestep . steps: number of steps. interval_in_data : frequency of rows taken to dataset: if equals 1 than it is every step
  dataset = []
  global runTime_simulation
  for i in range(steps):
      bodies_pos=[]
      for body in bodies:
        bodies_pos.append(body.position)
      if i % interval_in_data == 0:
        dataset += bodies_pos
      
      for i,body in enumerate(bodies):
        body.setAcceleration([b for b in bodies if b is not body])
        body.setVelocity(t)
        body.setPosition(t)
       
      for body in bodies:
        body.prevPosition = body.position
    
  three_body_simulation = np.array(dataset).reshape(steps//interval_in_data,len(bodies),2)
  three_body_simulation = three_body_simulation.reshape(len(three_body_simulation), len(bodies)*2)
  runTime_simulation = three_body_simulation
  return runTime_simulation

In [50]:
# calculate pendulum simulation
def simulate_double_pendulum(pendulum,timestep=0.01,simulation_time=10):
  global runTime_simulation
  total_length = pendulum.total_length
  inner_pendulum = pendulum.get_pendulum(0)
  outer_pendulum = pendulum.get_pendulum(1)

  # build array of timesteps [0,timestep,2*timestep,...,simulation_time-timestep]
  timesteps = np.arange(0, simulation_time, timestep)

  # integrate the ordinary differential equations
  y = integrate_ODE(timesteps,timestep,pendulum)

  # calculate pendulum positions
  x1 = inner_pendulum[0]*sin(y[:, 0])
  y1 = -inner_pendulum[0]*cos(y[:, 0])

  x2 = outer_pendulum[0]*sin(y[:, 2]) + x1
  y2 = -outer_pendulum[0]*cos(y[:, 2]) + y1

  double_pendulum_simulation = np.transpose(np.array([x1,y1,x2,y2]))
  runTime_simulation = double_pendulum_simulation
  return runTime_simulation

#### Create Simulation

In [51]:
# create random body simulation
def createRandomSimulation(numOfBodies,timestep,steps,interval_in_data=1):
  bodies =[]
  for i in range(numOfBodies):
    bodies.append( Body( np.array([np.random.uniform(-1,1),np.random.uniform(-1,1)]), np.array([np.random.uniform(-1,1) , np.random.uniform(-1,1)]) , 1 ) )

  simulation = simulate_on_time_step(bodies,timestep,steps,interval_in_data)
  simulation=simulation.reshape(len(simulation), numOfBodies*2 )
  return simulation

#### Display Simulation

In [52]:
# display the simulation (image)
import anvil.mpl_util
from PIL import Image
import io

def drawSimulation(simulation, simulation_name):
  prev = np.copy(simulation[0])
  numOfBodies = len(simulation[0])
  from random import randint
  colors = []

  for i in range(30):
    colors.append('#%06X' % randint(0, 0xFFFFFF))

  #colors = ['red','green','blue','yellow','black','brown','purple','pink',]
  mpl_fig = plt.figure()
  print(type(simulation))
  print(simulation)
  for i,sim in enumerate(simulation):
    #if i == 0:
      #continue
    for j in range(0,len(sim),2):
      plt.plot([prev[j],sim[j]],[prev[j+1],sim[j+1]],  color = colors[j // 2])
      prev[j] = np.copy(sim[j])
      prev[j+1] = np.copy(sim[j+1])

  plt.savefig(f"/content/tmp/{simulation_name}")
  img = Image.open(f"/content/tmp/{simulation_name}.png",mode='r')
  bs = io.BytesIO()
  img.save(bs,format="PNG")
  return anvil.BlobMedia("image/png", bs.getvalue(), name=simulation_name)

In [53]:
# display body movment (image)
def showBodiesMovmentInGraph(simulation):
  plt.title("X body coordinate")
  for i in range(0,len(simulation.columns),2):
      df = simulation.iloc[:,i]
      plt.plot(df)
  plt.show()
  plt.title("Y body coordinate")
  for i in range(1,len(simulation.columns),2):
      df = simulation.iloc[:,i]
      plt.plot(df)
  plt.show()

### File Manipulation

In [54]:
# create temp folder
import os

@anvil.server.callable
def create_tmp_folder():
  path = os.path.join('/content/', 'tmp')
  if os.path.exists(path):
    return
  os.mkdir(path)

In [55]:
# remove temp folder
import shutil

@anvil.server.callable
def remove_tmp_folder():
  dir_path = '/content/tmp'
  shutil.rmtree(dir_path, ignore_errors=True)

In [56]:
remove_tmp_folder()

In [57]:
# save current simulation as a .csv file
def saveSimulation(simulation,numOfBodies,simulation_name):
  df = numpyToPandas(simulation,numOfBodies)
  df.to_csv(f'/content/tmp/{simulation_name}.csv',index=False)
  
# load simulation from a .csv file
def loadSimulation(df):
  global runTime_simulation
  runTime_simulation = pd.DataFrame.to_numpy(df)
  return runTime_simulation

In [58]:
import anvil.media

# transfer a saved .csv file from colab to anvil
@anvil.server.callable
def get_sim_csv(simulation_name):
  f = open(f'/content/tmp/{simulation_name}.csv', 'r')
  data = f.read().encode('utf_8')
  return anvil.BlobMedia("text/csv", data , name=simulation_name)

In [59]:
import pandas as pd

# transfer a loaded .csv file from anvil to colab
@anvil.server.callable
def store_data(file,file_name):
  with anvil.media.TempFile(file) as f:
    df = pd.read_csv(f)
    df.to_csv(f'/content/tmp/{file_name}.csv',index=False)

#### Simulation Filters

In [60]:
# Add random noise to the simulation
def addNoise(simulation):
  noise = np.random.normal(0, .1, simulation.shape)
  return simulation + noise

# Remove a body from the simulation
def removeBody(simulation, bodyNum):
  del1 = np.delete(simulation,(bodyNum-1)*2,1)
  return np.delete(del1,(bodyNum-1)*2,1)

### Additional functions

In [61]:
# convert simulation to fit panda's format
def numpyToPandas(simulation,numOfBodies):
  columns = []

  for i in range(numOfBodies):
    columns.append(f'Body{i+1} x')
    columns.append(f'Body{i+1} y')
  simulation_reshaped = simulation.reshape(len(simulation), numOfBodies*2 )

  df = pd.DataFrame(simulation_reshaped, columns = columns)
  return df

In [62]:
# preprocess simulation
def preprocessSimulation(simulation):
  shape = simulation[0].shape
  newSimulation = np.append(simulation[1:],[np.zeros(shape)],axis=0) - simulation
  newSimulation = newSimulation[:-1]
  for i,s in enumerate(newSimulation):
    s = s / np.linalg.norm(simulation[i])
  return newSimulation

### Anvil - simulation functions

In [63]:
@anvil.server.callable
def make_body_simulation(simulation_name,bodylist,timesteps,steps,interval_in_data):
  bodies = initialize_bodies(bodylist)
  sim = simulate_on_time_step(bodies,timesteps,steps,interval_in_data)
  saveSimulation(sim,len(bodylist),simulation_name)
  #showBodiesMovmentInGraph(sim).show()
  return drawSimulation(sim,simulation_name)

In [64]:
# create pendulum simulation
@anvil.server.callable
def make_pendulum_simulation(simulation_name,doublePendulum,simulation_time,timestep):
  inner_pendulum = doublePendulum[0]
  outer_pendulum = doublePendulum[1]
  pendulum = double_pendulum([inner_pendulum[0],outer_pendulum[0]],[inner_pendulum[1],outer_pendulum[1]],
                             [inner_pendulum[2],outer_pendulum[2]],[inner_pendulum[3],outer_pendulum[3]],
                             simulation_name)
  sim = simulate_double_pendulum(pendulum,timestep,simulation_time)
  saveSimulation(sim,2,simulation_name)
  return drawSimulation(sim,simulation_name)

In [65]:
import numpy as np
from matplotlib import image

@anvil.server.callable
def make_simulation_from_csv(simulation_name):
  df = pd.read_csv(f'/content/tmp/{simulation_name}.csv')
  simulation = loadSimulation(df)
  return drawSimulation(simulation,simulation_name)

## Predictor

Prepare Simulation

In [66]:
# prepare simulation for Model training
def prepareData(simulation,window_size_X,window_size_y): 
  X = []
  y = []
  for i in range(len(simulation)-window_size_X - window_size_y):
    row = [bodies for bodies in simulation[i:i+window_size_X]]  
    X.append(row)
    label =  [bodies for bodies in simulation[i+window_size_X:i+window_size_X + window_size_y]]
    y.append(label)

  return np.array(X) , np.array(y)

Model Pre-Training

In [67]:
# create random training set
def createRandomTrainingSetForPreTrainedModel():
    randomSimulations = []
    numOfSimulations = 5
    X_trainArr = [None]*numOfSimulations
    y_trainArr = [None]*numOfSimulations
    for i in range(numOfSimulations):
      randomSimulations.append(createRandomSimulation(4,timestep,steps,interval_in_data))
      X_trainArr[i], y_trainArr[i] = prepareData(train,windowSizeX,windowSizeY)

 #   for i in range(len(randomSimulations)):
      saveSimulation(randomSimulations[i],numOfBodies,f"{numOfBodies}-Body Simulation{i+20}")


    return X_trainArr, y_trainArr

In [68]:
# train model on random simulation
def trainOnRandomSimulations(epochs, numOfSimulations, X_trainArr,y_trainArr):

    for i in range(epochs):
      for j in range(numOfSimulations):
      #model.fit(X_train, y_train, validation_data=(X_val, y_val), epochs=2, callbacks=[cp1])
      #history = model.fit(X_train, y_train, epochs=1, callbacks=[cp1])
        history = model.fit(X_trainArr[i], y_trainArr[i], epochs=1, callbacks=[cp1])

In [69]:
### Inception


from tensorflow.keras.layers import *
from tensorflow.keras.models import Model

def inception_module(x, base_channels=32):
  #a = Conv2D(base_channels*2, 1, 1, activation='relu')(x)
  
  
  b_1 = Conv2D(base_channels*2, 3, 1, activation='relu')(x)
  b_2 = Conv2D(base_channels*4, 3, 1, padding='same', activation='relu')(b_1)

  c_1 = Conv2D(base_channels, 3, 1, activation='relu')(x)
  c_2 = Conv2D(base_channels, 5, 1, padding='same', activation='relu')(c_1)


  d_1 = MaxPooling2D(3, 1, padding='same')(x)
  d_2 = Conv2D(base_channels, 1, 1, activation='relu')(d_1)




  return Concatenate(axis=-1)([b_2, c_2])

def Inceptionmodel(window_size_X,window_size_y,numberOfBodies):

  inp = Input((window_size_X, numberOfBodies*2, 1))

  maps_1 = inception_module(inp)
  maps_2 = inception_module(maps_1, base_channels=16)
  
  #gap = GlobalMaxPooling2D()(maps_2)
  flatten = Flatten()(maps_2)

  output = Dense(window_size_y * numberOfBodies*2, activation='linear')(flatten)
  output = (tf.keras.layers.Reshape([window_size_y, numberOfBodies*2]))(output)
  model = Model(inputs=inp, outputs=output)
  model.summary()
  return model
  
  

Split Simulation Data

In [70]:
# split simulation data to training, validation & testing sets
def splitToTrainValidTest(simulation,testPercentage=0.1):
  trainLen = round((len(simulation)*(1-testPercentage-0.1)))
  valLen = round((len(simulation)*0.1))
  train_set = simulation[:trainLen]
  val_set = simulation[trainLen:trainLen+valLen]
  test_set = simulation[trainLen+valLen:]
  return train_set,val_set,test_set

### Models

In [71]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import *
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.losses import MeanSquaredError
from tensorflow.keras.metrics import RootMeanSquaredError
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.optimizers.schedules import ExponentialDecay
from tensorflow.keras.preprocessing.sequence import TimeseriesGenerator

#### CNN (Convolutional Neural Network)

In [72]:
def buildModelCNN(window_size_X,window_size_y,numberOfBodies): 
    model = Sequential()
    model.add(InputLayer((window_size_X,numberOfBodies*2,1)))


    model.add(tf.keras.layers.Conv2D(64, 3, padding="same", activation="relu"))
    model.add(tf.keras.layers.Conv2D(32, 3, padding="same", activation="relu"))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.2))
    model.add(tf.keras.layers.Conv2D(32, 3, padding="same", activation="relu"))
    model.add(tf.keras.layers.Conv2D(32, 3, padding="same", activation="relu"))
    model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Dropout(0.2))
    model.add(tf.keras.layers.Conv2D(32, 3, padding="same", activation="relu"))
    # model.add(MaxPooling2D(pool_size=(2, 2)))
    model.add(Flatten())

    model.add(Dense(units =  window_size_y*numberOfBodies*2, activation ='linear'))
    model.add(tf.keras.layers.Reshape([window_size_y, numberOfBodies*2]))

    model.summary()

    return model

#### LSTM (Long Short-Term Memory)

In [73]:
def buildModelLSTM(window_size_X,window_size_y,numberOfBodies):

  model = keras.Sequential()
  model.add(keras.layers.LSTM(128, kernel_initializer='he_uniform', batch_input_shape=(None, window_size_X, numberOfBodies*2), return_sequences=True, name='encoder_1'))
  model.add(keras.layers.LSTM(64, kernel_initializer='he_uniform', return_sequences=True, name='encoder_2'))
  model.add(keras.layers.LSTM(32, kernel_initializer='he_uniform', return_sequences=True, name='encoder_3'))
  model.add(keras.layers.Flatten())

  model.add(tf.keras.layers.Dense(units =  window_size_y*numberOfBodies*2, activation ='linear'))
  model.add(tf.keras.layers.Reshape([window_size_y, numberOfBodies*2]))
  model.summary()
  return model

#### GRU (Gated Recurrent Unit)

In [74]:
def buildModelGRU(window_size_X,window_size_y,numberOfBodies):

  model = keras.Sequential()
  model.add(keras.layers.GRU(128, kernel_initializer='he_uniform', batch_input_shape=(None, window_size_X, numberOfBodies*2), return_sequences=True, name='encoder_1'))
  model.add(keras.layers.GRU(64, kernel_initializer='he_uniform', return_sequences=True, name='encoder_2'))
  model.add(keras.layers.GRU(32, kernel_initializer='he_uniform', return_sequences=True, name='encoder_3'))

  model.add(Dense(units =  window_size_y*numberOfBodies*2, activation ='linear'))
  model.add(tf.keras.layers.Reshape([window_size_y, numberOfBodies*2]))
  model.summary()
  return model

### Model Prediction

#### Display Model Prediction + Simulation

In [75]:
@anvil.server.callable
def make_model(model_type,windowSizeX=100,windowSizeY=100):
  global runTime_model
  global runTime_simulation
  global history

  global X_train
  global y_train
  global X_val
  global y_val
  global X_test
  global y_test

  numOfBodies = runTime_simulation.shape[1]//2
  # diffSimulation1 = np.diff(runTime_simulation,axis = 0)
  train,val,test = splitToTrainValidTest(runTime_simulation,0.4)
 
  X_train,y_train = prepareData(train, windowSizeX,windowSizeY)
  X_val,y_val = prepareData(val, windowSizeX,windowSizeY)
  X_test,y_test = prepareData(test, windowSizeX,windowSizeY)

  if model_type == "LSTM":
    runTime_model = buildModelLSTM(windowSizeX,windowSizeY,numOfBodies)
  if model_type == "GRU":
    runTime_model = buildModelGRU(windowSizeX,windowSizeY,numOfBodies)
  if model_type == "CNN":
    runTime_model = buildModelCNN(windowSizeX,windowSizeY,numOfBodies)

  cp1 = ModelCheckpoint('runTime_model/', save_best_only=True)
  runTime_model.compile(loss=MeanSquaredError(), optimizer=Adam(learning_rate=0.0001), metrics=[RootMeanSquaredError()])
  history = runTime_model.fit(X_train,y_train, validation_data=(X_val, y_val), epochs=15, callbacks=[cp1],batch_size = 32)

In [76]:
import anvil.mpl_util
from PIL import Image
import io

@anvil.server.callable
def show_loss(simulation_name):

  global history
  plt.clf()
  plt.title('model loss')
  plt.plot(history.history['loss'])
  plt.ylabel('loss')
  plt.xlabel('epoch')
  try:
    plt.plot(history.history['val_loss'])
    plt.legend(['train', 'val'], loc='upper left')
  except:
    print("not enough data for validation graph")

  plt.savefig(f"/content/tmp/{simulation_name}_loss")
  img = Image.open(f"/content/tmp/{simulation_name}_loss.png",mode='r')
  bs = io.BytesIO()
  img.save(bs,format="PNG")
  return anvil.BlobMedia("image/png", bs.getvalue(), name=simulation_name)

In [77]:

@anvil.server.callable
def get_prediction(simulation_name):
  global X_test
  global predictions
  global runTime_model
  predictions = runTime_model.predict(X_test[:1])
  predictions = predictions.squeeze()
  return drawSimulation(predictions,f'{simulation_name}_prediction')

@anvil.server.callable
def get_real(simulation_name):
  global y_test
  real = y_test[:1]
  real = real.squeeze()
  return drawSimulation(real,f'{simulation_name}_real')


## Anvil connection

In [None]:
if __name__ == "__main__":
  try:
    anvil.server.wait_forever()
  except KeyboardInterrupt:
    print("...disconnected from anvil")