## Variational Auto-Encoder (VAE) PyTorch implementation
Based on the paper by Larsen et al.: "Autoencoding beyond pixels using a learned similarity metric" - https://arxiv.org/abs/1512.09300

### Imports

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt


import torch
from torch import nn
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler

plt.style.use('seaborn-whitegrid')

In [None]:
# Library options
pd.options.mode.chained_assignment = None  # default='warn'

# Get CPU or GPU device for NN
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")
print(f"CUDA version: {torch.version.cuda}")

### Helper functions

In [None]:
def airfoil_info(airfoil_df, plot_airfoil, mach=0.1, reynolds=1e5):
    """
    Returns a plot of a NACA airfoil's shape, Cl and Cd between alpha = -10 and alpha = 10 degrees. 
    Assumes 15 points for upper surface and 15 points for lower surface.
    Inputs:
        - airfoil_df: pandas dataframe with airfoil data
        - plot_airfoil: index that specifies which airfoil to plot
    Outputs:
        - Plot of airfoil shape, Cl and Cd
    """
    # X coordinates
    x = [0.5*(1-np.cos(ang)) for ang in np.linspace(0,np.pi,17)]
    aux_x = list(reversed([0.5*(1-np.cos(ang)) for ang in np.linspace(0,np.pi,17)[1:16]]))
    [x.append(i) for i in aux_x]
    x.append(0)
    
    first_coord_list = airfoil_df.loc[(airfoil_df['MachNumber'] == mach) & (airfoil_df['ReynoldsNumber'] == reynolds)]['yU_1'].unique()
    plot_airfoil_df = airfoil_df.loc[(airfoil_df['MachNumber'] == mach) & (airfoil_df['ReynoldsNumber'] == reynolds) & (airfoil_df['yU_1']==first_coord_list[plot_airfoil])]
    
    # Y coordinates
    y = []
    origin = (plot_airfoil_df.iloc[1][0]+plot_airfoil_df.iloc[1][15])/2
    y.append(origin)
    [y.append(j) for j in plot_airfoil_df.iloc[1][0:15].values.tolist()]
    y.append(0)
    aux_y = list(reversed(plot_airfoil_df.iloc[1][15:30].values.tolist()))
    [y.append(k) for k in aux_y]
    y.append(origin)
    
    # Cl, Cd, alphas
    Cl = plot_airfoil_df['Cl'].values.tolist()[0:21]
    Cd = plot_airfoil_df['Cd'].values.tolist()[0:21]
    alphas = np.linspace(-10,10,len(Cl))
    
    # Airfoil plot
    plot1 = plt.subplot2grid((2,2), (0,0), colspan = 2)
    plot2 = plt.subplot2grid((2,2), (1,0))
    plot3 = plt.subplot2grid((2,2), (1,1))
    
    plot1.plot(x, y)
    plot1.set_xlim([-0.1,1.1])
    plot1.set_ylim([np.min(y)-0.2*np.abs(np.min(y)),np.max(y)+0.2*np.abs(np.max(y))])
    plot1.set_ylabel('$y/c$')
    plot1.set_xlabel('$x/c$') 
    plot1.set_title('Airfoil plot', fontsize=16)
    
    plot2.plot(alphas, Cl)
    plot2.set_xlim([-10,10])
    plot2.set_ylim([np.min(Cl)-0.1*np.abs(np.min(Cl)),np.max(Cl)+0.1*np.abs(np.max(Cl))])
    plot2.set_ylabel('$C_L$')
    plot2.set_xlabel('$\\alpha$ [$^\\circ$]') 
    
    plot3.plot(alphas, Cd)
    plot3.set_xlim([-10,10])
    plot3.set_ylim([np.min(Cd)-0.1*np.abs(np.min(Cd)),np.max(Cd)+0.1*np.abs(np.max(Cd))])
    plot3.set_ylabel('$C_D$')
    plot3.set_xlabel('$\\alpha$ [$^\\circ$]') 
 
    plt.tight_layout()
    plt.show()
    return

def airfoil_plot(airfoil_coords, fig=None, label=None):
    """
    Returns a plot of an airfoil. Used to visualize output of the optimizer. 
    Assumes 15 points for upper surface and 15 points for lower surface, with cosine spacing.
    Inputs:
        - airfoil_coords: pandas DataFrame with airfoil coordinates and other parameters
    Outputs:
        - Plot of airfoil shape
    """
    if fig==None:
        fig = plt.subplot2grid((1,3), (0,0), colspan = 3)
    # X coordinates
    x = [0.5*(1-np.cos(ang)) for ang in np.linspace(0,np.pi,17)]
    aux_x = list(reversed([0.5*(1-np.cos(ang)) for ang in np.linspace(0,np.pi,17)[1:16]]))
    [x.append(i) for i in aux_x]
    x.append(0)
    
    # Y coordinates
    y = []
    origin = (airfoil_coords.iloc[0][0]+airfoil_coords.iloc[0][15])/2
    y.append(origin)
    [y.append(j) for j in airfoil_coords.iloc[0][0:15].values.tolist()]
    y.append(0)
    aux_y = list(reversed(airfoil_coords.iloc[0][15:30].values.tolist()))
    [y.append(k) for k in aux_y]
    y.append(origin)
    
    # Airfoil plot     
    fig.plot(x, y, label = label)
    fig.set_xlim([-0.1,1.1])
    fig.set_ylim([-0.2,0.3])
    fig.set_ylabel('$y/c$')
    fig.set_xlabel('$x/c$') 
    fig.set_title('Airfoil plot', fontsize=16)
    fig.legend()
    if fig==None:
        plt.show()
    return


def torch_test_split(X, y, test_size=0.2, seed=1234):
    """
    Returns a train and test set in PyTorch tensor format from a numpy array dataset.
    Inputs:
        - X: numpy array with input data. Each row is a training/testing sample and each column is a feature.
        - y: numpy array with output data. Each row is a training/testing sample and each column is an output.
        - test_size: proportion of the dataset to be used as test set.
        - seed: random seed for reproducibility.
    Outputs:
        - training_data: PyTorch tensor with training data.
        - test_data: PyTorch tensor with test data.
    """
    X_train_0, X_test_0, y_train_0, y_test_0 = train_test_split(X, y, test_size=test_size, random_state=seed)
    X_train = torch.from_numpy(X_train_0).float()
    X_test = torch.from_numpy(X_test_0).float()
    y_train = torch.from_numpy(y_train_0).float()
    y_test = torch.from_numpy(y_test_0).float()
    training_data = []
    testing_data = []
    for i in range(len(X_train)):
        training_data.append((X_train[i], y_train[i]))
    for i in range(len(X_test)):
        testing_data.append((X_test[i], y_test[i]))
    return training_data, testing_data

def normalize_data (data, scaler):
    """
    Normalizes neural network inputs and outputs.
    Inputs:
        - data: data to be normalized. [np.array / pd.DataFrame]
        - scaler: pre-fitted scaler object.
    Outputs:
        - normalized data. [pd.DataFrame]
    """
    if type(data) == pd.DataFrame:
        data = data.to_numpy().reshape(-1,scaler.n_features_in_)
    elif type(data) == np.ndarray:
        data = data.reshape(-1,scaler.n_features_in_)
    else:
        raise(TypeError('Input data must be either a pd.DataFrame or a np.ndarray'))
    norm_data = pd.DataFrame(data = scaler.transform(data), columns = scaler.feature_names_in_)
    return norm_data

def denormalize_data (data, scaler):
    """
    Denormalizes neural network inputs and outputs.
    Inputs:
        - data: data to be denormalized. [np.array / pd.DataFrame]
        - scaler: pre-fitted scaler object.
    Outputs:
        - denormalized data. [pd.DataFrame]
    """
    if type(data) == pd.DataFrame:
        data = data.to_numpy().reshape(-1,scaler.n_features_in_)
    elif type(data) == np.ndarray:
        data = data.reshape(-1,scaler.n_features_in_)
    else:
        raise(TypeError('Input data must be either a pd.DataFrame or a np.ndarray'))
    denorm_data = pd.DataFrame(data = scaler.inverse_transform(data), columns = scaler.feature_names_in_)
    return denorm_data


### Input data analysis

In [None]:
# Define input dataset (.csv) name and path
data_folder = './data/'
dataset_name = 'NACA4Digit_Dataset15Point.csv'

# Import dataset
airfoil_df = pd.read_csv(data_folder + dataset_name)
airfoil_df = airfoil_df.drop('Unnamed: 0', axis=1)    # Remove first column, counter

# Get rid of duplicates
airfoil_df = airfoil_df.drop_duplicates(subset=['yU_1', 'yL_1', 'ReynoldsNumber', 'MachNumber', 'alpha', 'Cl','Cd','Cm'])

airfoil_df.head()

## 2. Autoencoder model
In this section, an MLP based encoder-decoder network will be created and trained to process airfoil coordinates.

**Inputs**
- Upper surface coordinates (15)
- Lower surface coordinates (15)

**Outputs**
- Approximately the same coordinates (network will be trained to do so)

There will be a layer in the middle that will encode the _latent features_ of the set. This is akin to a parameterization method, with arbitrary parameters.

In [None]:
# Data scaler fitting
scaler = MinMaxScaler()
scaler.fit(airfoil_df)

# Assemble a DataFrame with all the minimum and maximum values of each column
# For normalization and de-normalization. Gives an idea of the bounds.
scaler_bounds = pd.DataFrame(data = np.stack([scaler.feature_names_in_, scaler.data_min_, scaler.data_max_], axis=1), columns=['property', 'min', 'max'])

# Data normalization
airfoil_df_norm = normalize_data(airfoil_df, scaler)

# Input and "output" features
# Input and output features are both the same for this dataset.
X = airfoil_df_norm.drop(['Cl', 'Cd', 'Cm', 'ReynoldsNumber', 'MachNumber', 'alpha'], axis=1).values

# Data tensors
training_data, test_data = torch_test_split(X, X, test_size=0.2)

# Data loaders
batch_size = 1024

train_dataloader = DataLoader(training_data, batch_size=batch_size)
test_dataloader = DataLoader(test_data, batch_size=batch_size)

for X, y in test_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape} {y.dtype}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

### 2.1 Creating models

In [None]:
# Define model
num_features = 3

class EncoderDecoder(nn.Module):
    """
    Encoder-decoder neural network based on MLP layers. 
    Widths = [64, 32, num_features] for the encoder and [32, 64] for the decoder.
    Activation : ReLU.
    """
    def __init__(self, num_features):
        super(EncoderDecoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(30, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, num_features)
        )
        self.decoder = nn.Sequential(
            nn.Linear(num_features, 64),
            nn.ReLU(),
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Linear(128, 30)
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded
    
    def encode(self,x):
        encoded = self.encoder(x)
        return encoded
    
    def decode(self,x):
        decoded = self.decoder(x)
        return decoded

model = EncoderDecoder(num_features).to(device)
print(model)

In [None]:
# Loss function and optimizer
# As per Moin et al. (2021), use MSE loss and Adam optimizer.
loss_fn = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0005, betas=(0.9, 0.999), eps=1e-08, weight_decay=0)

In [None]:
# Define train and test functions
def train(dataloader, model, loss_fn, optimizer, loss_output = None):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
            
    if loss_output is not None:
        loss_output.append(loss.item())
            
def test(dataloader, model, loss_fn):
    num_batches = len(dataloader)
    model.eval()
    test_loss = 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
    test_loss /= num_batches
    print(f"Avg loss: {test_loss:>8f} \n")

In [None]:
# Run training
epochs = 150
loss_plot = []
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer, loss_output = loss_plot)
    test(test_dataloader, model, loss_fn)
print("Done!")

# Loss plot
plt.plot(range(1, epochs+1), loss_plot)
plt.ylabel('Loss')
plt.xlabel('Epoch') 
plt.title('Training loss', fontsize=16)
plt.xlim([0,epochs])
plt.show()

### 2.3 Saving the model

In [None]:
save_model = False
model_name = "AE_MLP128_30_"+str(num_features)+".pth"
if save_model:
    trained_root = "./trained_models/"
    
    model_path = trained_root + model_name
    torch.save(model.state_dict(), model_path)
    print(f"Saved PyTorch Model State to {model_path}")

### 2.4 Loading the model

In [None]:
# For these sections, the CPU will be used
device = 'cpu'
model = EncoderDecoder(num_features).to(device)
model_root = "./trained_models/"
model_name = "AE_MLP128_30_"+str(num_features)+".pth"

model_path = model_root + model_name
model.load_state_dict(torch.load(model_path, map_location=device))

### 2.5 Making predictions on random airfoils

In [None]:
idxpred = np.random.randint(0, len(test_data))
x, y = test_data[idxpred][0].to(device), test_data[idxpred][1].to(device)
dummy = np.array([0, 0, 0, 0, 0, 0]) # Dummy variable to complete the dataset rows to 36 for plotting
with torch.no_grad():
    pred = model(x)
    reconstructed = denormalize_data(np.concatenate((pred.cpu().numpy(), dummy), axis=0),scaler)
    original = denormalize_data(np.concatenate((x.cpu().numpy(), dummy), axis=0),scaler)
    plot1 = plt.subplot2grid((1,3), (0,0), colspan = 3)
    print("------ Comparison ------")
    airfoil_plot(original, fig=plot1, label="Original")
    airfoil_plot(reconstructed, fig=plot1, label="Reconstructed")

## Reformulate the dataset using latent variables
Recreate the dataset with the latent variables and obtain the range of such variables to measure our design space.

In [None]:
# -- New database ---
# Choose whether to process the old database. It will be saved in ./data/

process_database = False
if process_database:
    def encode_coords(row, model):
        """
        Encodes the coordinates of a row of the dataset. Uses an autoencoder model to encode the coordinates.
        Inputs:
        - row: A row of the dataset. Contains 30 coordinate points.
        - model: An autoencoder model.
        Outputs:
        - encoded: The encoded coordinates of the row.
        """
        return model.encode(torch.Tensor(row.iloc[:][0:30].values.reshape(-1,30)).to(device)).detach().cpu().numpy()[0] 

    # --- New dataset ---
    airfoil_df_encoded = airfoil_df_norm.copy(deep=True)
    # Drop the columns containing original coordinates
    airfoil_df_encoded = airfoil_df_encoded[airfoil_df_encoded.columns.drop(list(airfoil_df_encoded.filter(regex='^y')))]

    encoded_coords = airfoil_df_norm.apply(encode_coords, axis=1, model=model)
    for i in range(1, num_features+1):
        airfoil_df_encoded[f"feat{i}"] = [j[i-1] for j in encoded_coords]

    cols = airfoil_df_encoded.columns.tolist()
    cols = cols[-num_features:] + cols[:-num_features]
    airfoil_df_encoded = airfoil_df_encoded[cols]
    airfoil_df_encoded.to_csv("./data/NACA4Digit_Dataset15Point_encoded"+str(num_features)+".csv", index=True)


### Analyze design space
Get bounds for each of the latent variables: minimum and maximum values and width of the variable space ($\Delta$).

In [None]:
# Define input dataset (.csv) name and path
data_folder = './data/'
dataset_name = 'NACA4Digit_Dataset15Point_encoded'+str(num_features)+'.csv'

# Import dataset
airfoil_df_encoded = pd.read_csv(data_folder + dataset_name)
airfoil_df_encoded = airfoil_df_encoded.drop('Unnamed: 0', axis=1)    # Remove first column, counter
# Data scaler fitting
scaler_enc = MinMaxScaler()
scaler_enc.fit(airfoil_df_encoded)

# Assemble a DataFrame with all the minimum and maximum values of each column
# For normalization and de-normalization. Gives an idea of the bounds.
scaler_enc_bounds = pd.DataFrame(data = np.stack([scaler_enc.feature_names_in_, scaler_enc.data_min_, scaler_enc.data_max_], axis=1), columns=['property', 'min', 'max'])
scaler_enc_bounds['delta'] = [(scaler_enc_bounds[scaler_enc_bounds['property']==feat]['max'].values[0]-scaler_enc_bounds[scaler_enc_bounds['property']==feat]['min'].values[0]) for feat in scaler_enc_bounds['property'].values]
scaler_enc_bounds

### Generate and plot an airfoil
Use the decoder part of the AE network to generate and plot an airfoil from its randomly generated latent representation.

In [None]:
latent_random = np.array([np.random.rand()*(scaler_enc_bounds[scaler_enc_bounds['property']==f'feat{i}']['delta'].values[0])+scaler_enc_bounds[scaler_enc_bounds['property']==f'feat{i}']['min'].values[0] for i in range(1, num_features+1)])
print(latent_random.reshape(-1,num_features)[0])
print(type(latent_random), latent_random.shape)
invented = denormalize_data(np.concatenate((model.decode(torch.Tensor(latent_random.reshape(-1,num_features)).to(device)).detach().cpu().numpy()[0], dummy), axis=0),scaler)
airfoil_plot(invented, label="Invented")