In [1]:
import torch
import numpy as np
import scipy as sp
from torch.nn import MSELoss

from torch.utils.data.dataset import TensorDataset
from torch.utils.data.dataloader import DataLoader
from torch.utils.data.sampler import RandomSampler
from torch import Tensor, nn

from scipy.ndimage import gaussian_filter1d
from tqdm import tqdm

from itertools import chain

from koopman.simulation.simulator import simulate_batch
from koopman.simulation.systems import Pendulum

In [2]:
def generate_random_smooth_controls(N, T):
    dt = 0.05
    w = np.random.randn(N, T, 1) * np.sqrt(dt)
    b = np.cumsum(w, axis=1)
    b_smooth = gaussian_filter1d(b, sigma=10, axis=1)

    return b_smooth

In [3]:
params = Pendulum.Params(
    m=1, l=1, g=9.81, b=0.0
)

pendulum = Pendulum(params)

tf = 5.0
dt = 0.05
N = 5_000

theta0 = np.random.uniform(-np.pi, np.pi, (N, 1))
omega0 = np.random.uniform(-3, 3, (N, 1))
x0 = np.hstack((theta0, omega0))

U = generate_random_smooth_controls(N, int(tf/dt))

ts, xhist, uhist = simulate_batch(
    sys=pendulum,
    tf=tf,
    dt=dt,
    u=U,
    x0=x0
)

split = 0.8
N_train = int(N * split)
N_eval = N - N_train

xhist_train, uhist_train = xhist[:N_train], uhist[:N_train]
xhist_eval, uhist_eval = uhist[N_train:], uhist[N_train:]


Simulation progress: 100%|██████████| 100/100 [00:00<00:00, 2005.61it/s]


In [4]:
# Prepare training
traj_per_batch = 5

dataset_train = TensorDataset(torch.tensor(xhist_train, dtype=torch.float32),
                              torch.tensor(uhist_train, dtype=torch.float32))
dataset_eval = TensorDataset(torch.tensor(xhist_eval, dtype=torch.float32),
                             torch.tensor(xhist_eval, dtype=torch.float32))

dataloader_train = DataLoader(dataset_train,
                              batch_size=traj_per_batch,
                              shuffle=True)
dataloader_eval = DataLoader(dataset_eval,
                             batch_size=traj_per_batch,
                             shuffle=False)

reconstruction_loss = MSELoss()
evolution_loss = MSELoss()

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [10]:
class AutoEncoder(nn.Module):
    def __init__(self, nx, nz, n_hidden=None):
        super().__init__()
        if n_hidden is None:
            n_hidden = nz
        self.nx, self.nz, self.n_hidden = nx, nz, n_hidden
        self.encoder = nn.Sequential(
            nn.Linear(nx, n_hidden),
            nn.GELU(),
            nn.Linear(n_hidden, n_hidden),
            nn.GELU(),
            nn.Linear(n_hidden, nz),
        )
        self.decoder = nn.Sequential(
            nn.Linear(nz, n_hidden),
            nn.GELU(),
            nn.Linear(n_hidden, n_hidden),
            nn.GELU(),
            nn.Linear(n_hidden, nx)
        )

    def forward(self, x):
        z = self.encoder(x)
        return self.decoder(z)

    def encode(self, x):
        return self.encoder(x)

    def decode(self, z):
        return self.decoder(z)


In [8]:
from torch import optim

# Train
n_epochs = 50
nx, nu = 2, 1
nz_x = 20
nz_u = 20

autoenc_x = AutoEncoder(nx, nz_x).to(device)
autoenc_u = AutoEncoder(nx, nz_u).to(device)

A = torch.nn.Linear(nz_x, nz_x, bias=False).to(device)
B = torch.nn.Linear(nz_u, nz_x, bias=False).to(device)
nn.init.eye_(A)
nn.init.zeros_(B)

# Cs = list()
# for i in range(nu):
#     Ci = torch.nn.Linear(nz_x, nz_u, bias=False).to(device)
#     nn.init.zeros_(Ci)
#     Cs.append(Ci)


mse_loss = MSELoss()

theta = chain(autoenc_x.parameters(),
              autoenc_u.parameters(),
              A.parameters(),
              B.parameters(),)
              # *[Ci.parameters() for Ci in Cs])

optimizer = optim.Adam(theta, lr=1e-3)

for epoch in range(n_epochs):
    print(f"Epoch {epoch+1}/{n_epochs}")
    iterator = tqdm(dataloader_train, total=len(dataloader_train) - 1, desc="Training")
    for batch_idx, (x, u) in enumerate(iterator):
        # x: (batch_size, traj_len, nx)
        x, u = x.to(device), u.to(device)
        xp = x[:, 1:]
        x, u = torch.reshape(x[:, :-1], (-1, nx)), torch.reshape(u, (-1, nu))
        xp = torch.reshape(xp, (-1, nx))

        optimizer.zero_grad()

        zx = autoenc_x.encode(x)
        zu = autoenc_u.encode(x)
        x_rec = autoenc_x.decode(zx)
        u_rec = autoenc_u.decode(zu)

        reconstr_loss_x = mse_loss(x, x_rec)
        reconstr_loss_u = mse_loss(x, u)

        xp_pred = autoenc_x.decode(
            A(zx) + B(zu)
        )
        evolution_loss = mse_loss(xp, xp_pred)
        loss = reconstr_loss_x + reconstr_loss_u + evolution_loss
        loss.backward()

        optimizer.step()

    print(f"Epoch {epoch+1}, L_reconstr,x: {reconstr_loss_x.item():.4f}, "
          f"L_reconstr,u: {reconstr_loss_u.item():.4f}, ",
          f"L_evolution: {evolution_loss.item():.4f}, ",
          f"L_total: {loss.item():.4f}")



TypeError: empty() received an invalid combination of arguments - got (tuple, dtype=NoneType, device=NoneType), but expected one of:
 * (tuple of ints size, *, tuple of names names, torch.memory_format memory_format = None, torch.dtype dtype = None, torch.layout layout = None, torch.device device = None, bool pin_memory = False, bool requires_grad = False)
 * (tuple of ints size, *, torch.memory_format memory_format = None, Tensor out = None, torch.dtype dtype = None, torch.layout layout = None, torch.device device = None, bool pin_memory = False, bool requires_grad = False)
