In [None]:
import torch
import sys
import os
from matplotlib import pyplot as plt
import numpy as np
import matplotlib.cm as cm
import scipy
from datetime import datetime
import json

module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
	sys.path.insert(0, module_path)
print(sys.path)

import Double_Pendulum.Learning.autoencoders as autoencoders
import Double_Pendulum.robot_parameters as robot_parameters
import Double_Pendulum.transforms as transforms
import Double_Pendulum.dynamics as dynamics
import Plotting.pendulum_plot as pendulum_plot

from functools import partial



%load_ext autoreload
%autoreload 2


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
rp = robot_parameters.LUMPED_PARAMETERS
plotter = pendulum_plot.Anim_plotter(rp)
neural_net = True

fix_q = True
model_cw = False
model_shifting = True

if fix_q and not neural_net:
	fix_q = False
	print("Analytic transform used, so fix_q set to False. Did you check your inputs properly?")

if neural_net:
	model = autoencoders.Autoencoder_double(rp).to(device)
	model_location = '../Double_Pendulum/Learning/Models/NN_202505141642(half-q)/NN_202505141642_0.pth'
	model.load_state_dict(torch.load(model_location, weights_only=True))
else:
	model_location = "N/A"
	model = autoencoders.Analytic_transformer(rp)

In [None]:
# When controlling on th_0
#"""
Kp = torch.tensor([[100., 0.], 
				   [0., 0.]]).to(device)
Kd = torch.tensor([[40., 0.], 
				   [0., 0.]]).to(device)
#"""

# When controlling on th_1, just for testing
"""
Kp = torch.tensor([[0., 0.], 
				   [0., 100.]]).to(device)
Kd = torch.tensor([[0., 0.], 
				   [0., 40.]]).to(device)
"""

# When controlling on both th_0 and th_1, just for testing
"""
Kp = torch.tensor([[100., 0.], 
				   [0., 10.]]).to(device)
Kd = torch.tensor([[40., 0.], 
				   [0., 10.]]).to(device)
"""

#C_damp = torch.tensor([[12., 0.], [0., 12.]]).to(device)

In [None]:
xy_start = torch.tensor([0., -2.]).requires_grad_().to(device)
q_start = transforms.inverse_kinematics(xy_start, rp, is_clockwise=model_cw).unsqueeze(0)
q_d_start = torch.tensor([[0., 0.]]).requires_grad_().to(device)
q_dd_start = torch.tensor([[0., 0.]]).requires_grad_().to(device)

is_clockwise_start = transforms.check_clockwise(q_start.squeeze(0))

xy_des_real = torch.tensor([2., -2.]).requires_grad_().to(device)
q_des = transforms.inverse_kinematics(xy_des_real, rp, is_clockwise=model_cw).unsqueeze(0)
q_d_des = torch.tensor([[0., 0.]]).requires_grad_().to(device)

th_start = model.encoder_vmap(q_start)
th_d_start = (model.jacobian_enc(q_start) @ q_d_start.T).T
th_dd_start = torch.tensor([[0., 0.]]).requires_grad_().to(device) #TODO: Obtain accelerations in thetas. Do I care?

is_clockwise_des = transforms.check_clockwise(q_des.squeeze(0))

th_des = model.encoder_vmap(q_des)
th_d_des = (model.jacobian_enc(q_des) @ q_d_des.T).T

q_des_est = model.decoder_vmap(th_des, is_clockwise_des)
q_d_des_est = (model.jacobian_dec(th_des, clockwise=is_clockwise_des) @ th_d_des.T).T
xy_des_est, _ = transforms.forward_kinematics(rp, q_des_est[0])

print(q_start)

In [None]:
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

th_series, th_d_series, th_dd_series = torch.empty((0,2)).to(device), torch.empty((0,2)).to(device), torch.empty((0,2)).to(device)
q_est_series, q_d_est_series = torch.empty((0,2)).to(device), torch.empty((0,2)).to(device)
q_real_series, q_d_real_series, q_dd_real_series = torch.empty((0,2)).to(device), torch.empty((0,2)).to(device), torch.empty((0,2)).to(device)
u_series = torch.empty((0,1)).to(device)


dt = 0.01
t_end = 15
t_series = torch.arange(0, t_end, dt)

th = th_start
th_d = th_d_start
th_dd = th_dd_start

q_est = model.decoder(th_start, clockwise=is_clockwise_start)
q_d_est = (model.jacobian_dec(th_start, clockwise=is_clockwise_start) @ th_d_start.T).T

q_real, q_d_real, q_dd_real = q_start, q_d_start, q_dd_start

is_clockwise = transforms.check_clockwise(q_start.squeeze(0))


if neural_net and fix_q:
	note_string = "NOTE: The controller expects the NN to be trained on " + ("COUNTER" if not model_cw else "") + "CLOCKWISE data."
	print(note_string)
if model_shifting:
	print("NOTE: The controller expects points in q to be shifted (to make q-space unimodal).")



for t in torch.arange(0, t_end, dt):
	t_string = "Time: [" + str(t.item().__round__(3)) + "/" + str(t_end) + ".0]"
	print(t_string)


	is_clockwise = transforms.check_clockwise(q_real.squeeze(0))
	if fix_q and model_cw != is_clockwise:
		print("fixing value to", "clockwise" if model_cw else "counterclockwise")
		q = transforms.flip_q(rp, q_real.squeeze(0), model_cw).unsqueeze(0)
		q_d = transforms.flip_q_d(rp, q_real.squeeze(0), q_d_real, model_cw)
	else:
		q = q_real
		q_d = q_d_real

	if model_shifting:
		q = transforms.shift_q(q, clockwise=model_cw)
	th = model.encoder_vmap(q)
	th_d = (model.jacobian_enc(q) @ q_d.T).T
	
	q = model.decoder_vmap(th, clockwise=model_cw)
	q_d = (model.jacobian_dec(th) @ th_d.T).T

	""" Obtain Jacobian, dynamical matrices"""

	J_h = model.jacobian_enc(q)
	J_h_trans = torch.transpose(J_h, 0, 1)
	is_clockwise = transforms.check_clockwise(q.squeeze(0))
	
	J_h_inv = model.jacobian_dec(th, is_clockwise).squeeze(0)
	J_h_inv_trans = torch.transpose(J_h_inv, 0, 1)

	M_q_est, C_q_est, G_q_est = dynamics.dynamical_matrices(rp, q.squeeze(0), q_d.squeeze(0))
	A_q_est = dynamics.input_matrix(rp, q.squeeze(0))


	""" Feed-forward simulation of the system, not on real dynamics """

	M_th, C_th, G_th = transforms.transform_dynamical_from_inverse(M_q_est, C_q_est, G_q_est, th, th_d, J_h_inv, J_h_inv_trans)
	A_th = transforms.transform_input_matrix_from_inverse_trans(A_q_est, J_h_inv_trans)

	u = torch.pinverse(A_th) @ ((Kp @ (th_des - th).T + Kd @ (th_d_des - th_d).T))
	u = u + torch.pinverse(A_th) @ G_th

	""" Update the real system and apply latent control input. """

	M_q_real, C_q_real, G_q_real = dynamics.dynamical_matrices(rp, q_real.squeeze(0), q_d_real.squeeze(0))
	A_q_real = dynamics.input_matrix(rp, q_real.squeeze(0))

	tau_q_real = A_q_real * u
	q_dd_real = (torch.pinverse(M_q_real) @ (tau_q_real - C_q_real @ ((q_d_real).T)- G_q_real)).T  #  - C_damp @ ((q_d_real).T) 
	q_d_real = q_d_real + q_dd_real * dt
	q_real = q_real + q_d_real * dt
	q_real = transforms.wrap_to_pi(q_real)
	
	
	q_real_shifted = transforms.shift_q(q_real, clockwise=model_cw)
	th = model.encoder_vmap(q_real_shifted)
	#print(q_real)
	q_est = model.decoder_vmap(th, clockwise=transforms.check_clockwise(q_real.squeeze(0)))
	q_est = transforms.wrap_to_pi(q_est)
	th_d = (model.jacobian_enc(q_real) @ q_d_real.T).T
	q_d_est = (model.jacobian_dec(th, clockwise=is_clockwise) @ th_d.T).T


	""" Store data for plotting """

	th_series = torch.cat((th_series, th.detach()), dim=0)
	th_d_series = torch.cat((th_d_series, th_d.detach()), dim=0)
	th_dd_series = torch.cat((th_dd_series, th_dd.detach()), dim=0)
	
	q_real_series = torch.cat((q_real_series, q_real.detach()), dim=0)
	q_d_real_series = torch.cat((q_d_real_series, q_d_real.detach()), dim=0)
	q_dd_real_series = torch.cat((q_dd_real_series, q_dd_real.detach()), dim=0)

	q_est_series = torch.cat((q_est_series, q_est.detach()), dim=0)
	q_d_est_series = torch.cat((q_d_est_series, q_d_est.detach()), dim=0)

	u_series = torch.cat((u_series, u.detach()), dim=0)

	#print(is_clockwise)
	print("")

	
	


In [None]:
xy_real_series = torch.empty((0,2)).to(device)
xy_est_series = torch.empty((0,2)).to(device)

#for q_real, q_est, q_naive in zip(q_real_series, q_est_series, q_naive_series):
for q_real, q_est in zip(q_real_series, q_est_series):
	xy_real, _ = transforms.forward_kinematics(rp, q_real)
	xy_real_series = torch.cat((xy_real_series, xy_real.unsqueeze(0)))
	xy_est, _ = transforms.forward_kinematics(rp, q_est)
	xy_est_series = torch.cat((xy_est_series, xy_est.unsqueeze(0)))

In [None]:
save_dir = f"collocated_results/simulation_{timestamp}"
os.makedirs(save_dir, exist_ok=True)
print(xy_start[1])
print(Kp[0,0].item())
metadata = {
	"timestamp": timestamp,
	"Learned transform": neural_net,
	"Model location": model_location,
	"Model clockwise": model_cw,
	"Model shifting": model_shifting, 
	"PD gains": {
		"Kp": [[Kp[0,0].item(), Kp[0,1].item()], 
			   [Kp[1,0].item(), Kp[1,1].item()]],  
		"Kd": [[Kd[0,0].item(), Kd[0,1].item()], 
			   [Kd[1,0].item(), Kd[1,1].item()]]   
	},
	"Actuator location": {"x_a": rp["xa"], "y_a": rp["ya"]},
	"xy start": {"x": xy_start[0].item(), "y": xy_start[1].item()},
	"Time step": dt,
	"Sim time": t_end,
}

metadata["PD gains"]["Kp"] = str(metadata["PD gains"]["Kp"])
metadata["PD gains"]["Kd"] = str(metadata["PD gains"]["Kd"])

metadata_path = os.path.join(save_dir, "metadata.json")
with open(metadata_path, "w") as f:
	json.dump(metadata, f, indent=4)

In [None]:
stride = 1
pos_end_q_real, pos_elbow_q_real = torch.vmap(transforms.forward_kinematics, in_dims=(None, 0))(rp, q_real_series[::stride])
pos_end_q_est, pos_elbow_q_est = torch.vmap(transforms.forward_kinematics, in_dims=(None, 0))(rp, q_est_series[::stride])
#pos_end_q_naive, pos_elbow_q_naive = torch.vmap(transforms.forward_kinematics, in_dims=(None, 0))(rp, q_naive_series[::stride])

frames_q_real = plotter.frame_pendulum(pos_end_q_real, pos_elbow_q_real)
frames_q_est = plotter.frame_pendulum(pos_end_q_est, pos_elbow_q_est)
#frames_q_naive = plotter.frame_pendulum(pos_end_q_naive, pos_elbow_q_naive)

data_real = {
	"frames": frames_q_real,
	"times": dt,
	"name": "q_real", 
	"arm_color": "tab:blue",
	"act_color": "tab:cyan"
}

data_est = {
	"frames": frames_q_est,
	"times": dt,
	"name": "q_est",
	"arm_color": "tab:orange", 
	"act_color": "tab:red"
} 

"""
data_naive = {
	"frames": frames_q_naive,
	"times": dt,
	"name": "q_naive",
	"arm_color": "tab:green", 
	"act_color": "tab:olive"
}
"""

ref_pos_real = {
	"pos": xy_des_real,
	"name": "real",
	"color": "tab:blue"
}

ref_pos_est = {
	"pos": xy_des_est,
	"name": "est",
	"color": "tab:orange"
}

"""
ref_pos_naive = {
	"pos": xy_des_real,
	"name": "naive",
	"color": "tab:green"
}
"""

ref_poss = [ref_pos_real]#, ref_pos_est]#, ref_pos_naive]

frames_data = [data_real, data_est]#, data_naive]
#frames_data = [data_est]
#frames_data = [data_real]

name_rp = "RP:(" + str(rp["xa"]) + "," + str(rp["ya"]) + ")_"
name_ref = "ref:(" + str(xy_des_real[0].item()) + "," + str(xy_des_real[1].item()) + ")_"

if neural_net:
	model_type = "NN"
else:
	model_type = "AL"

file_name = "t_end:[" + str(t_end) + "]_dt:[" + str(dt) + "]_stride:[" + str(stride) + "]_0.mp4"
file_counter = 0

output_path = os.path.join(save_dir, file_name)

while os.path.isfile(output_path):
	print("file name already exists")
	file_counter += 1
	file_name = file_name[:-6] + "_" + str(file_counter) + ".mp4"
	output_path = os.path.join(save_dir, file_name)

plotter.animate_pendulum(frames_data, ref_poss=ref_poss, plot_actuator=True, save_path=output_path, fps = 1/(dt*stride), dt = dt*stride)
#plotter.animate_pendulum(frames_data, ref_pos=None, plot_actuator=False, file_name=file_name, fps = 1/(dt*stride), dt = dt*stride)


In [None]:
datasets_q = [
	#{
	#    "name": "est",
	#    "values": q_est_series.cpu().detach().numpy(),
	#    "color": "tab:orange"
	#},
	{
		"name": "real",
		"values": q_real_series.cpu().detach().numpy(),
		"color": "tab:blue"
	}#,
	#{
	#    "name": "naive",
	#    "values": q_naive_series.cpu().detach().numpy(),
	#    "color": "tab:green"
	#}
]
datasets_th = [
	{
		"name": "est",
		"values": th_series.cpu().detach().numpy(),
		"color": "tab:orange"
	}#,56
	#{
	#    "name": "ana",
	#    "values": th_
	#    "color": "tab:blue"
	#}
]

datasets_xy = [
	#{
	#    "name": "est",
	#    "values": xy_est_series.cpu().detach().numpy(),
	#    "color": "tab:orange"
	#},
	{
		"name": "real",
		"values": xy_real_series.cpu().detach().numpy(),
		"color": "tab:blue"
	}#,
	#{
	#    "name": "naive",
	#    "values": xy_naive_series.cpu().detach().numpy(),
	#    "color": "tab:green"
	#}
]

# Common labels for the plots.
name_q = "q trajectory"
name_th = "th trajectory"
name_xy = "xy trajectory"
t_series = torch.arange(0, t_end, dt)

# Create an instance of ErrorPlotter.
ep = pendulum_plot.Error_plotter(rp)

# Prepare plot datasets for each column.
# Each call groups a set of datasets to be drawn in one subplot column.
column1 = ep.create_plot_dataset(t=t_series, datasets=datasets_q, reference=q_des, name=name_q)
column2 = ep.create_plot_dataset(t=t_series, datasets=datasets_th, reference=th_des, name=name_th)
column3 = ep.create_plot_dataset(t=t_series, datasets=datasets_xy, reference=xy_des_real.unsqueeze(0), name=name_xy)
plot_datasets = [column1, column2, column3]

file_name = "Error plot.png"
file_counter = 0

output_path = os.path.join(save_dir, file_name)

# Pass the list of columns (plot_dataset objects) to plot_multi.
ep.plot_multi(plot_datasets=plot_datasets, save_path=output_path, axes_names = ["q", "th", "xy"])

#### Analytic simulation

In [None]:
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

th_ana_series, th_d_ana_series = torch.empty((0,2)).to(device), torch.empty((0,2)).to(device)
q_ana_series, q_d_ana_series, q_dd_ana_series = torch.empty((0,2)).to(device), torch.empty((0,2)).to(device), torch.empty((0,2)).to(device)
u_ana_series = torch.empty((0,1)).to(device)
tau_q_ana_series = torch.empty((0,2)).to(device)


dt = 0.01
t_end = 15
t_series = torch.arange(0, t_end, dt)

th = th_start_ana
th_d = th_d_start_ana


q_ana, q_d_ana, q_dd_ana = q_start, q_d_start, q_dd_start

is_clockwise = False

model = autoencoders.Analytic_transformer(rp)

for t in torch.arange(0, t_end, dt):
	t_string = "Time: [" + str(t.item().__round__(3)) + "/" + str(t_end) + ".0]"
	print(t_string)


	q_ana
	q_d_ana

	th_ana = model.encoder_vmap(q_ana)
	th_d_ana = (model.jacobian_enc(q_ana) @ q_d_ana.T).T
	
	q_hat_ana = model.decoder_vmap(th_ana, clockwise=model_cw)
	q_d_hat_ana = (model.jacobian_dec(th_ana) @ th_d_ana.T).T

	""" Obtain Jacobian, dynamical matrices"""
	
	J_h_inv = model.jacobian_dec(th, is_clockwise).squeeze(0)
	J_h_inv_trans = torch.transpose(J_h_inv, 0, 1)

	M_q_est, C_q_est, G_q_est = dynamics.dynamical_matrices(rp, q_hat_ana.squeeze(0), q_d_hat_ana.squeeze(0))
	A_q_est = dynamics.input_matrix(rp, q_hat_ana.squeeze(0))


	""" Feed-forward simulation of the system, not on real dynamics """

	M_th, C_th, G_th = transforms.transform_dynamical_from_inverse(M_q_est, C_q_est, G_q_est, th_ana, th_d_ana, J_h_inv, J_h_inv_trans)
	A_th = transforms.transform_input_matrix_from_inverse_trans(A_q_est, J_h_inv_trans)

	u = torch.pinverse(A_th) @ ((Kp @ (th_des - th_ana).T + Kd @ (th_d_des - th_d_ana).T))
	u = u + torch.pinverse(A_th) @ G_th

	""" Update the real system and apply latent control input. """

	M_q_real, C_q_real, G_q_real = dynamics.dynamical_matrices(rp, q_ana.squeeze(0), q_d_ana.squeeze(0))
	A_q_real = dynamics.input_matrix(rp, q_ana.squeeze(0))

	tau_q_real = A_q_real * u
	q_dd_ana = (torch.pinverse(M_q_real) @ (tau_q_real - C_q_real @ ((q_d_ana).T)- G_q_real)).T  #  - C_damp @ ((q_d_real).T) 
	q_d_ana = q_d_ana + q_dd_ana * dt
	q_ana = q_ana + q_d_ana * dt
	q_ana = transforms.wrap_to_pi(q_ana)
	
	th_ana = model.encoder_vmap(q_ana)
	th_d_ana = (model.jacobian_enc(q_ana) @ q_d_ana.T).T


	""" Store data for plotting """

	th_ana_series = torch.cat((th_ana_series, th_ana.detach()), dim=0)
	th_d_ana_series = torch.cat((th_d_ana_series, th_d_ana.detach()), dim=0)
	
	q_ana_series = torch.cat((q_ana_series, q_ana.detach()), dim=0)
	q_d_ana_series = torch.cat((q_d_ana_series, q_d_ana.detach()), dim=0)
	q_dd_ana_series = torch.cat((q_dd_ana_series, q_dd_ana.detach()), dim=0)

	u_ana_series = torch.cat((u_ana_series, u.detach()), dim=0)
	tau_q_ana_series = torch.cat((tau_q_ana_series, tau_q_real.T.detach()), dim=0)

	print("")

	
	
