In [None]:
import numpy as np
import torch
import torch.nn as nn
import pandas as pd
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import time
import pydde as d

In [None]:
#Parameters
samplenum = 50
epochs = 200
hiddenlayers = [200,100]
input_size = 3
output_size = 3
learning_rate = 0.01
time_length = 60; #seconds

In [None]:
# Generate simulation
dyn = d.PyDyn('test2.sim', 60)
state_init = dyn.compute(dyn.p_init)
f = dyn.py_f(state_init)
df = dyn.py_df_dp(state_init)
dy = dyn.py_dy_dp(state_init)

In [None]:
#Sample targets only variables in z direction
y_target = np.zeros((samplenum,3))
y_target[:,2] = np.random.rand(samplenum)
#x[:,0] = np.random.rand(samplenum)
y_target[:,1] = 2
y_target= torch.tensor(y_target)

## Building the custon Simulation Activation Function

In [None]:
class Simulate(torch.autograd.Function):
    
    @staticmethod
    def forward(ctx, input):
        #print(f'input: {input.shape}')
        p = input.clone().numpy().transpose()
        state = dyn.compute(p)
        y_pred = torch.tensor(state.y[-3:])
        #print(f'y_pred: {y_pred.dtype}')
        
        ctx.save_for_backward(input)
        
        return y_pred, input
    
    @staticmethod
    def backward(ctx, grad_output, input):
        #print(grad_output.shape)
        input, = ctx.saved_tensors
        p = input.clone().numpy().transpose()
        state= dyn.compute(p)
        dy_dp = dyn.py_dy_dp(state)
        dy_dp = dy_dp[-3:, :]
        #print(f'shape of dy/dp: {dy_dp.shape}')
        #print(f'shape of grad_output: {grad_output.shape}')
        grad_output = grad_output.unsqueeze(0)
        
        grad_input = torch.tensor(dy_dp).t().mm(grad_output.t()).t()
        return grad_input, None

Simulate = Simulate.apply

## Building the Model

In [None]:
class ActiveLearn(nn.Module):

    def __init__(self, n_in, out_sz):
        super(ActiveLearn, self).__init__()

        self.L_in = nn.Linear(n_in, 180)
        self.Relu = nn.ReLU(inplace=True)
        self.P = nn.Linear(180, 180)
        #self.L_out = nn.Linear(3, 3)
    
    def forward(self, input):
        x = self.L_in(input)
        x = self.Relu(x)
        x = self.P(x)
        x = self.Relu(x)
        x, p = Simulate(x)
        #x = self.L_out(x)
        return x, p


In [None]:
model = ActiveLearn(input_size, output_size)

criterion = nn.MSELoss()  # RMSE = np.sqrt(MSE)
optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate)

y_target = y_target.float()

## Train the model

In [28]:
torch.autograd.set_detect_anomaly(True)

start_time = time.time()

losses = []
y_preds= np.zeros((samplenum, 3))
p_preds= np.zeros((samplenum, 3*time_length))

#y_pred = torch.tensor(y_pred)
for i in range(epochs):
    for s in range(samplenum):
        y_truth = y_target[s, :]
        #print(y_truth.shape)
        #y_truth = y_truth.unsqueeze(0)
        y_pred, p_pred = model(y_truth)
        #print(y_pred.shape)
        y_preds[s, :] = y_pred.detach()
        p_preds[s, :] = p_pred.detach()
        #loss = torch.sqrt(criterion(y_pred.float(), y_truth)) # RMSE
        loss = criterion(y_pred.float(), y_truth) # MSE
        #loss =sum((y_pred.float()-y_truth)**2) 
        losses.append(loss)
        #Clear the gradient buffer (w <-- w - lr*gradient)
        optimizer.zero_grad()
        #Back Prop
        loss.backward()
        optimizer.step()
    print(f'epoch: {i:3}/{epochs}  loss: {loss.item():10.8f}')
    i+=1

print(f'epoch: {i:3} loss: {loss.item():10.8f}') # print the last line
print(f'\nDuration: {(time.time() - start_time)/60:.3f} min') # print the time elapsed

epoch:   0/200  loss: 0.99080276
epoch:   1/200  loss: 1.01049316
epoch:   2/200  loss: 1.26221561
epoch:   3/200  loss: 0.03516496
epoch:   4/200  loss: 0.12425490
epoch:   5/200  loss: 0.12155505
epoch:   6/200  loss: 0.12074237
epoch:   7/200  loss: 0.12124157
epoch:   8/200  loss: 0.12035796
epoch:   9/200  loss: 0.12023153
epoch:  10/200  loss: 0.12017441
epoch:  11/200  loss: 0.12012833
epoch:  12/200  loss: 0.12009065
epoch:  13/200  loss: 0.12006062
epoch:  14/200  loss: 0.12003334
epoch:  15/200  loss: 0.12000012


KeyboardInterrupt: 

In [None]:
#Save Model

if len(losses) == epochs*(samplenum):
    torch.save(model.state_dict(), 'Trained_Model_dyn_100s100e.pt')
    print('Model saved')
else:
    print('Model has not been trained. Consider loading a trained model instead.')

## Grad Check

In [None]:
from torch.autograd import gradcheck

# gradcheck takes a tuple of tensors as input, check if your gradient
# evaluated with these tensors are close enough to numerical
# approximations and returns True if they all verify this condition.
input = (x_train, y_train)
test = gradcheck(Simulate, input, eps=1e-6, atol=1e-4)
print(test)

## Test forward propagation

In [None]:
p= np.random.rand(180)
p= torch.tensor(p)
y_pred, p_pred = Simulate(p)
y_pred = y_pred.clone().numpy()

yTraj_test = sim.compute(p_pred)

print(y_pred)
print(yTraj_test[-3:])
print(np.sum(yTraj_test[-3:]-y_pred))