In [71]:
# See torchdyn optimal control tutorial and
# [1] Optimal Energy Shaping via Neural Approximators: https://arxiv.org/abs/2101.05537

import torch
import torch.nn as nn
from torch.autograd import grad as grad

TEST_RUN = True

# physical parameters
m, k, l, qr, b, g = 1., 0.5, 1, 0, 0.01, 9.81

class ControlledSystem(nn.Module):
    # Elastic Pendulum Model
    def __init__(self, V, K):
        super().__init__()
        self.V, self.K, self.n = V, K, 1

    def forward(self, t, x):
        # Evaluates the closed-loop vector field
        with torch.set_grad_enabled(True):
            q, p = x[..., :self.n], x[..., self.n:]
            q = q.requires_grad_(True)
            # compute control action
            u = self._energy_shaping(q) + self._damping_injection(x)
            # compute dynamics
            dxdt = self._dynamics(q, p, u)
        return dxdt

    def _dynamics(self, q, p, u):
        # controlled elastic pendulum dynamics
        dqdt = p / m
        dpdt = -k * (q - qr) - m * g * l * torch.sin(q) - b * p / m + u
        return torch.cat([dqdt, dpdt], 1)

    def _energy_shaping(self, q):
        # energy shaping control action
        dVdx = grad(self.V(q).sum(), q, create_graph=True)[0]
        return -dVdx

    def _damping_injection(self, x):
        # damping injection control action
        return -self.K(x) * x[:, self.n:] / m

    def _autonomous_energy(self, x):
        # Hamiltonian (total energy) of the UNCONTROLLED system
        return (m * x[:, 1:] ** 2) / 2. + (k * (x[:, :1] - qr) ** 2) / 2 \
               + m * g * l * (1 - torch.cos(x[:, :1]))

    def _energy(self, x):
        # Hamiltonian (total energy) of the CONTROLLED system
        return (m * x[:, 1:] ** 2) / 2. + (k * (x[:, :1] - qr) ** 2) / 2 \
               + m * g * l * (1 - torch.cos(x[:, :1])) + self.V(x[:, :1])


class AugmentedDynamics(nn.Module):
    # "augmented" vector field to take into account integral loss functions
    def __init__(self, f, int_loss):
        super().__init__()
        self.f = f
        self.int_loss = int_loss
        self.nfe = 0.

    def forward(self, t, x):
        self.nfe += 1
        x = x[:,:2]
        dxdt = self.f(t, x)
        dldt = self.int_loss(t, x)
        return torch.cat([dxdt, dldt], 1)

In [72]:
import pytorch_lightning as pl
import torch.utils.data as data

class EnergyShapingLearner(pl.LightningModule):
    def __init__(self, model: nn.Module, prior_dist, target_dist, t_span, sensitivity='autograd'):
        super().__init__()
        self.model = model
        self.prior, self.target = prior_dist, target_dist
        self.t_span = t_span
        self.batch_size = 2048
        self.lr = 5e-3
        self.weight = torch.Tensor([1., 1.]).reshape(1, 2)

    def forward(self, x):
        return self.model.odeint(x, self.t_span)
    
    def training_step(self, batch, batch_idx):
        # sample a batch of initial conditions
        x0 = self.prior.sample((self.batch_size,))

        # Integrate the model
        x0 = torch.cat([x0, torch.zeros(self.batch_size, 1).to(x0)], 1)
        _, xTl = self(x0)
        xT, l = xTl[-1, :, :2], xTl[-1, :, -1:]

        # Compute loss
        terminal_loss = weighted_log_likelihood_loss(xT, self.target, self.weight.to(xT))
        integral_loss = torch.mean(l)
        loss = terminal_loss + 0.01 * integral_loss
        return {'loss': loss}

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.model.parameters(), lr=self.lr)
        scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=.999)
        return [optimizer], [scheduler]

    def train_dataloader(self):
        dummy_trainloader = data.DataLoader(
            data.TensorDataset(torch.Tensor(1, 1), torch.Tensor(1, 1)),
            batch_size=1)
        return dummy_trainloader

In [73]:
from torch.distributions import Uniform, Normal

def prior_dist(q_min, q_max, p_min, p_max, device='cuda'):
    # uniform "prior" distribution of initial conditions x(0)=[q(0),p(0)]
    lb = torch.Tensor([q_min, p_min]).to(device)
    ub = torch.Tensor([q_max, p_max]).to(device)
    return Uniform(lb, ub)

def target_dist(mu, sigma, device='cuda'):
    # normal target distribution of terminal states x(T)
    mu, sigma = torch.Tensor(mu).reshape(1, 2).to(device), torch.Tensor(sigma).reshape(1, 2).to(device)
    return Normal(mu, torch.sqrt(sigma))

In [74]:
def weighted_log_likelihood_loss(x, target, weight):
    # weighted negative log likelihood loss
    log_prob = target.log_prob(x)
    weighted_log_p = weight * log_prob
    return -torch.mean(weighted_log_p.sum(1))

class ControlEffort(nn.Module):
    # control effort integral cost
    def __init__(self, f):
        super().__init__()
        self.f = f
    def forward(self, t, x):
        with torch.set_grad_enabled(True):
            q = x[:,:1].requires_grad_(True)
            u = self.f._energy_shaping(q) + self.f._damping_injection(x)
        return torch.abs(u)

In [75]:
from numpy import pi as pi

prior = prior_dist(-2*pi, 2*pi, -2*pi, 2*pi) # Uniform "prior" distribution of initial conditions x(0) 
target = target_dist([0, 0], [.001, .001]) # Normal target distribution for x(T)

# define optimal energy shaping policy networks as in [1]
hdim = 64
V = nn.Sequential(
          nn.Linear(1, hdim),
          nn.Softplus(), 
          nn.Linear(hdim, hdim),
          nn.Tanh(), 
          nn.Linear(hdim, 1))
K = nn.Sequential(
          nn.Linear(2, hdim),
          nn.Softplus(),
          nn.Linear(hdim, 1),
          nn.Softplus())

for p in V[-1].parameters(): torch.nn.init.zeros_(p)
for p in K[-2].parameters(): torch.nn.init.zeros_(p)

# define controlled system dynamics
f = ControlledSystem(V, K)
aug_f = AugmentedDynamics(f, ControlEffort(f))

# define time horizon
# 3,30 default
t_span = torch.linspace(0, 3, 30)

In [76]:
from torchdyn.models import ODEProblem
prob = ODEProblem(aug_f, sensitivity='autograd', solver='rk4')

In [78]:
# train (it can be very slow on CPU) 
# (don't be scared if the loss starts very high)
learn = EnergyShapingLearner(prob, prior, target, t_span)
#trainer = pl.Trainer(max_epochs=650)
if TEST_RUN:
    trainer = pl.Trainer(max_epochs=1, accelerator="gpu")
else:
    trainer = pl.Trainer(max_epochs=750, accelerator="gpu")

trainer.fit(learn)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name  | Type       | Params
-------------------------------------
0 | model | ODEProblem | 4.6 K 
-------------------------------------
4.6 K     Trainable params
0         Non-trainable params
4.6 K     Total params
0.018     Total estimated model params size (MB)
  rank_zero_warn(
  rank_zero_warn(


Training: 0it [00:00, ?it/s]

  warn("Setting tolerances has no effect on fixed-step methods")
`Trainer.fit` stopped: `max_epochs=1` reached.


In [79]:
import matplotlib.pyplot as plt
from torchdiffeq import odeint

n_ic = 256
x0 = prior.sample(torch.Size([n_ic])).cpu()
x0 = torch.cat([x0, torch.zeros(n_ic, 1)], 1)
model = aug_f.cpu()
#
traj = odeint(model, x0, t_span, method='midpoint').detach()
traj = traj[..., :-1]

In [80]:
import numpy as np
print(traj.shape)
traj_th = traj[:,:,0]
print(traj_th.shape)
np.savetxt("theta_trajectories.csv", traj_th, delimiter=",")

torch.Size([30, 256, 2])
torch.Size([30, 256])
