In [None]:
# ¯\_(ツ)_/¯ 
# Import libraries
import torch
import torch.nn as nn
from sklearn.datasets import make_moons
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
import matplotlib.pyplot as plt

# FrEIA imports
import FrEIA.framework as Ff
import FrEIA.modules as Fm

In [2]:
class IkflowModelParameters:
    def __init__(self):
        self.coupling_layer = "glow"
        self.nb_nodes = 12
        self.dim_latent_space = 9
        self.coeff_fn_config = 3
        self.coeff_fn_internal_size = 1024
        self.permute_random_enabled = True
        self.sigmoid_on_output = False

        # ___ Loss parameters
        self.lambd_predict = 1.0  # Fit Loss lambda
        self.init_scale = 0.04473500291638653
        self.rnvp_clamp = 2.5
        self.y_noise_scale = 1e-7  # Add some noise to the cartesian poses
        self.zeros_noise_scale = 1e-3  # Also add noise to the padding

        self.softflow_noise_scale = 0.01
        self.softflow_enabled = True

    def __str__(self) -> str:
        s = "IkflowModelParameters\n"
        for k, v in self.__dict__.items():
            s += f"  {k}: \t{v}\n"
        return s


def subnet_constructor(internal_size: int, n_layers: int, ch_in: int, ch_out: int) -> nn.Sequential:
    """Create a MLP with width `internal_size`, depth `n_layers`, and input/output sizes of `ch_in`, `ch_out`

    Args:
        internal_size (int): Internal width of the network
        n_layers (int): Depth of the MLP
        ch_in (int): Input dimension of the MLP
        ch_out (int): Output dimension of the MLP
    """
    assert n_layers in [1, 2, 3, 4], "Number of layers `n_layers` must be in [1, ..., 4]"

    if n_layers == 1:
        return nn.Sequential(nn.Linear(ch_in, internal_size), nn.LeakyReLU(), nn.Linear(internal_size, ch_out))

    if n_layers == 2:
        return nn.Sequential(
            nn.Linear(ch_in, internal_size),
            nn.LeakyReLU(),
            nn.Linear(internal_size, internal_size),
            nn.LeakyReLU(),
            nn.Linear(internal_size, ch_out),
        )

    if n_layers == 3:
        return nn.Sequential(
            nn.Linear(ch_in, internal_size),
            nn.LeakyReLU(),
            nn.Linear(internal_size, internal_size),
            nn.LeakyReLU(),
            nn.Linear(internal_size, internal_size),
            nn.LeakyReLU(),
            nn.Linear(internal_size, ch_out),
        )

    if n_layers == 4:
        return nn.Sequential(
            nn.Linear(ch_in, internal_size),
            nn.LeakyReLU(),
            nn.Linear(internal_size, internal_size),
            nn.LeakyReLU(),
            nn.Linear(internal_size, internal_size),
            nn.LeakyReLU(),
            nn.Linear(internal_size, internal_size),
            nn.LeakyReLU(),
            nn.Linear(internal_size, ch_out),
        )


def inn_model(params: IkflowModelParameters, dim_cond: int, ndim_tot: int):
    
    def subnet_constructor_wrapper(ch_in: int, ch_out: int):
        return subnet_constructor(params.coeff_fn_internal_size, params.coeff_fn_config, ch_in, ch_out)

    # Input node
    input_node = Ff.InputNode(ndim_tot, name="input")
    nodes = [input_node]
    cond = Ff.ConditionNode(dim_cond)

    if params.sigmoid_on_output:
        # First map x from joint space to [0,1]
        nodes.append(get_pre_sigmoid_scaling(ndim_tot, robot, nodes)[0])
        nodes.append(Ff.Node([Node]))