#### VAE NOTEBOOK

This notebook applies VAE-coupled FFD to a geometry and illustrates various reconstruction aspects.

In [1]:
import matplotlib.pyplot as plt
import numpy as np
import random
import torch

from scipy.stats import qmc, norm
from torch.utils.data import TensorDataset, DataLoader

from aero_optim.geom import get_curvature
from aero_optim.ffd.ffd import FFD_2D
from aero_optim.ffd.ffd_vae_models import MLPEncoder, MLPDecoder, CNNEncoder, CNNDecoder, VAE, VAETrainer

# Latex consistent figures
plt.rcParams.update({
    "text.usetex": True,
    "font.family": "Times",
    "figure.dpi": 300,
    "font.size": 8,
    'legend.fontsize': 8, 
    "axes.titlesize": 8,
    "axes.labelsize": 8
})

The notebook input variables are:

- `seed` the sampling seed 
- `ncontrol` the number of FFD control points on each side of the lattice box
- `bounds` the FFD deformation boundaries
- `nprofile` the number of FFD deformed profiles used to build the FFD dataset
- `latent_dim` the latent space dimension of the VAE model
- `file` the path to the file containing the geometry coordinates

**Note**: the number of FFD design variables is `2 * ncontrol`, the number of VAE design variables is `latent_dim`

In [2]:
seed = 123
torch.manual_seed(seed)
ncontrol = 4
bounds = (-0.2, 0.2)
nprofile = 1000
latent_dim = 4
file = "/home/mschouler/Documents/Sorbonne/aero-optim/examples/RAE2822/data/rae2822.dat"

The `FFD_2D` object is created

In [3]:
ffd = FFD_2D(file, ncontrol)

A random LHS sampler is built and used to sample the FFD dataset

**Note**: this should take between 5 (RAE2822) to 10 (LRN-CASCADE) seconds

In [4]:
sampler = qmc.LatinHypercube(d=int(2 * ncontrol), seed=seed)
train_sample = sampler.random(n=nprofile)
train_sample = qmc.scale(train_sample, *bounds)
val_sample = sampler.random(n=int(0.2 * nprofile))
val_sample = qmc.scale(val_sample, *bounds)

profiles = []
for Delta in train_sample:
    profiles.append(ffd.apply_ffd(Delta))
for Delta in val_sample:
    profiles.append(ffd.apply_ffd(Delta))

The VAE training and evaluation datasets are generated 

In [None]:
Y  = np.stack([p[:, -1] for p in profiles] , axis=0, dtype=np.float32)
print(f"Y shape: {Y.shape}")

Y_train = Y[:nprofile]
Y_val = Y[nprofile:]

mean = np.mean(Y_train, axis=0)
std = np.std(Y_train, axis=0) + 1e-6

Y_train = (Y_train - mean) / std
Y_val = (Y_val - mean) / std

Y_train_torch = torch.from_numpy(Y_train.astype(np.float32))
Y_val_torch = torch.from_numpy(Y_val.astype(np.float32))

train_set = TensorDataset(Y_train_torch)
val_set = TensorDataset(Y_val_torch)

The VAE model is defined with MLP or CNN achitectures:

- `inner_dim` the number of neurones per hidden layer for the models dense layers 
- `w_kl` the weight of the Kullback Leibler divergence loss
- `encoder` the encoder model
- `decoder` the decoder model
- `VAEmodel` the VAE model
- `optimizer` the VAE optimizer
- `batch_size` the bastch size of the dataloaders
- `epochs` the number of training epochs

In [6]:
input_dim = profiles[0].shape[0]
inner_dim = [512, 256, 128]
w_kl = 1e-4

encoder = MLPEncoder(input_dim, inner_dim, latent_dim, torch.nn.SiLU(), batch_norm=True)
decoder = MLPDecoder(input_dim, inner_dim, latent_dim, torch.nn.SiLU(), batch_norm=True)

# encoder = CNNEncoder(input_dim, latent_dim, torch.nn.functional.silu, kernel_size=10)
# decoder = CNNDecoder(encoder.lin_dim, latent_dim, torch.nn.functional.silu, kernel_size=10)

VAEmodel = VAE(encoder, decoder, w_kl=w_kl)
VAEmodel.to(VAEmodel.device)
optimizer = torch.optim.AdamW(VAEmodel.parameters(), lr=1e-3, weight_decay=1e-2)

batch_size = 256
train_loader = DataLoader(train_set, shuffle=True, batch_size=batch_size)
val_loader = DataLoader(val_set, shuffle=True, batch_size=batch_size)
epochs = 200

The model is trained in 10 steps and the reconstruction error is computed at each step

In [None]:
trainer = VAETrainer(VAEmodel, optimizer)
errors = []

for i in range(10):
    trainer.train(train_loader, val_loader, epochs // 10)
    VAEmodel.eval()
    Y_pred = VAEmodel(Y_train_torch.to(VAEmodel.device)).x_recon.detach().cpu().numpy() * std + mean
    error = np.sqrt(
        np.sum(
            [np.sum((y_pred - y_true)**2)
             for y_pred, y_true in zip(Y_pred, Y_train * std + mean)]
        ) / nprofile
    )
    errors.append(error)
    print(f"reconstruction error after {(i + 1) * epochs // 10} epochs: {error}")

train_losses = trainer.train_losses
val_losses = trainer.val_losses

The losses and the reconstruction errors are plotted

**Note**: the POD reconstruction error `POD_error` can be specified here if the POD notebook was executed first.

In [None]:
POD_error = None

_, ax = plt.subplots(2, 1, figsize=(3.65, 3.65), sharex=True)
ax[0].plot(range(1, epochs * len(train_loader) + 1), train_losses, label="training loss")
ax[0].plot([ii * len(train_loader) for ii in range(1, epochs + 1)], val_losses, label="validation loss")
ax[0].set_yscale("log")

ax[1].set_ylabel("RMSE [m]")
if POD_error:
    ax[1].hlines(
        y=POD_error, xmin=epochs // 10 * len(train_loader),
        xmax=epochs * len(train_loader) + 1,
        color='lightgray', linestyle='-', label="POD"
    )
ax[1].plot(
    range(epochs // 10 * len(train_loader), epochs * len(train_loader) + 1, epochs // 10 * len(train_loader)),
    errors, color="r", marker="s", label="VAE"
)

ax[0].legend(loc="upper right")
ax[0].set(ylabel=f'MSE + ${w_kl}\\times KL$ $[-]$')
ax[1].legend(loc="upper right")
ax[1].set(xlabel="batch $[-]$", )
plt.tight_layout()
plt.show()

Random profiles from the FFD dataset and their reduced reconstruction are plotted

In [None]:
fig, ax = plt.subplots(figsize=(3.64, 2))
for ii in range(5):
    idx = random.randint(0, nprofile - 1)
    plt.plot(ffd.pts[:, 0], Y_train[idx] * std + mean)
    plt.plot(ffd.pts[:, 0], Y_pred[idx], linestyle="dashed", color="k")
ax.set(xlabel="$x$ [m]", ylabel="$y$ [m]", title=f"Reconstructed profiles with {latent_dim} latent dimensions")

The VAE boundaries are inferred from the modal coefficient min/max values

In [10]:
VAEmodel.eval()
latent_space = VAEmodel.encoder(Y_train_torch.to(VAEmodel.device)).detach().cpu().numpy()

l_bound = np.array([min(v) for v in latent_space.transpose()[:latent_dim]])
u_bound = np.array([max(v) for v in latent_space.transpose()[:latent_dim]])

The non-conservation of the FFD maximal deformations is illustrated with non-constrained LHS sampling

In [11]:
min_profile = ffd.apply_ffd(np.array([bounds[0]] * 2 * ncontrol))
max_profile = ffd.apply_ffd(np.array([bounds[-1]] * 2 * ncontrol))

In [12]:
new_bounds = (l_bound, u_bound)
new_sampler = qmc.LatinHypercube(d=latent_dim, seed=seed)
new_sample = new_sampler.random(n=100)
lhs_sample = qmc.scale(new_sample, *new_bounds)

lhs_output = VAEmodel.decoder(torch.from_numpy(lhs_sample.astype(np.float32)).to(VAEmodel.device)).detach().cpu().numpy()

In [None]:
fig, ax = plt.subplots(figsize=(3.15, 2))
for out_id, out in enumerate(lhs_output):
    if out_id == 0:
        plt.plot(
            ffd.pts[:, 0], out * std + mean,
            linestyle="solid", color="lightgrey", linewidth=0.5, label="random VAE profiles"
        )
    else:
        plt.plot(ffd.pts[:, 0], out * std + mean, linestyle="solid", color="lightgrey", linewidth=0.5)
plt.plot(ffd.pts[:, 0], ffd.pts[:, 1], color="k", label="baseline", linewidth=0.5)
ax.plot(min_profile[:, 0], min_profile[:, 1], color="k", linewidth=1, linestyle="dashed", label="min/max FFD profiles")
ax.plot(max_profile[:, 0], max_profile[:, 1], color="k", linewidth=1, linestyle="dashed")
ax.legend()
ax.set(xlabel="$x$ [m]", ylabel="$y$ [m]")
plt.tight_layout()
plt.show()

Smooth but similar reduced profiles are obtained via constrained LHS sampling i.e. within one standard deviation

In [14]:
d_mean = np.mean(latent_space[:, :latent_dim], axis=0)
d_std = np.std(latent_space[:, :latent_dim], axis=0)

new_bounds = (d_mean - d_std, d_mean + d_std)
c_lhs_sample = qmc.scale(new_sample, *new_bounds)

c_lhs_output = VAEmodel.decoder(torch.from_numpy(c_lhs_sample.astype(np.float32)).to(VAEmodel.device)).detach().cpu().numpy()

In [None]:
fig, ax = plt.subplots(figsize=(3.15, 2))
for out_id, out in enumerate(c_lhs_output):
    if out_id == 0:
        plt.plot(
            ffd.pts[:, 0], out * std + mean,
            linestyle="solid", color="lightgrey", linewidth=0.5, label="random VAE profiles"
        )
    else:
        plt.plot(ffd.pts[:, 0], out * std + mean, linestyle="solid", color="lightgrey", linewidth=0.5)
plt.plot(ffd.pts[:, 0], ffd.pts[:, 1], color="k", label="baseline", linewidth=0.5)
ax.plot(min_profile[:, 0], min_profile[:, 1], color="k", linewidth=1, linestyle="dashed", label="min/max FFD profiles")
ax.plot(max_profile[:, 0], max_profile[:, 1], color="k", linewidth=1, linestyle="dashed")
ax.legend()
ax.set(xlabel="$x$ [m]", ylabel="$y$ [m]")
plt.tight_layout()
plt.show()

Examples of 32 VAE reduced profiles

In [None]:
fig, ax = plt.subplots(4, 8, figsize=(10, 5))

for ii, aa in enumerate(ax):
    for jj, bb in enumerate(aa):
        bb.plot(ffd.pts[:, 0], c_lhs_output[ii * 8 + jj] * std + mean, color="k", linewidth=2)
        bb.axis("off")
        bb.set_aspect(4)

plt.show()

The latent space distributions are plotted

**Note**: if the latent space dimension `latent_dim` is changed, the sub-figure structure must be adapted

In [17]:
def fit_norm(bin_borders, latent_space):
    (mu, sigma) = norm.fit(latent_space)
    print(f"Normal curve fit popt {mu}, {sigma}")
    x = np.linspace(bin_borders[0], bin_borders[-1], 10000)
    y = norm.pdf(x, mu, sigma)
    return x, y

In [None]:
fig = plt.figure(figsize=(4.5, 4.5))
fig.subplots_adjust(hspace=0.4, wspace=0.4)
ax1 = plt.subplot(2, 2, 1)  # dim 1
ax2 = plt.subplot(2, 2, 2)  # dim 2
ax3 = plt.subplot(2, 2, 3)  # dim 3
ax4 = plt.subplot(2, 2, 4)  # dim 4
# ax1
n, bins, _ = ax1.hist(latent_space[:, 0], bins=20, density=1, linewidth=0.5, edgecolor="white")
# fit
x, y = fit_norm(bins, latent_space[:, 0])
ax1.plot(x, y, 'r--', linewidth=2)
ax1.set(xlabel="$\\alpha_1$ [-]", ylabel="$N$ [-]", title="a) distribution /dim 1")

# ax2
n, bins, _ = ax2.hist(latent_space[:, 1], bins=20, density=1, linewidth=0.5, edgecolor="white")
x, y = fit_norm(bins, latent_space[:, 1])
ax2.plot(x, y, 'r--', linewidth=2)
ax2.set(xlabel="$\\alpha_2$ [-]", ylabel="$N$ [-]", title="a) distribution /dim 2")

# ax3
bin_heights, bin_borders, _ = ax3.hist(latent_space[:, 2], bins=20, density=1, linewidth=0.5, edgecolor="white")
x, y = fit_norm(bin_borders, latent_space[:, 2])
ax3.plot(x, y, 'r--', linewidth=2)
ax3.set(xlabel="$\\alpha_3$ [-]", ylabel="$N$ [-]", title="a) distribution /dim 3")

# ax4
bin_heights, bin_borders, _ = ax4.hist(latent_space[:, 3], bins=20, density=1, linewidth=0.5, edgecolor="white")
x, y = fit_norm(bin_borders, latent_space[:, 3])
ax4.plot(x, y, 'r--', linewidth=2)
ax4.set(xlabel="$\\alpha_4$ [-]", ylabel="$N$ [-]", title="a) distribution /dim 4")

VAE reduced profiles curvature

In [None]:
idx = random.randint(0, len(c_lhs_output) - 1)
y = c_lhs_output[idx] * std + mean

# Compute curvature
bsl_curvature = get_curvature(ffd.pts)
vae_curvature = get_curvature(np.column_stack([ffd.pts[:, 0], y]))

# Plot curvature
fig, ax = plt.subplots(2, 1, figsize=(3.65, 3.65))
ax[0].plot(ffd.pts[:, 0], ffd.pts[:, 1], color='b', label='baseline shape')
ax[0].plot(ffd.pts[:, 0], y, color='r', label='VAE shape')
ax[0].legend()
ax[0].set_ylabel('$y$ [m]')

ax[1].plot(ffd.pts[:, 0], bsl_curvature, label='baseline curvature', color='b')
ax[1].plot(ffd.pts[:, 0], vae_curvature, label='VAE curvature', color='r')
ax[1].set_yscale('log')
ax[1].legend()
ax[1].set_xlabel('$x$ [m]')
ax[1].set_ylabel('curvature [m$^{-1}$]')
plt.show()
