# Conditional VAEs for Uncertainty Quantification in Fatigue Simulations
If running on google colab remember to change your runtime to "GPU" :


**(Runtime’ > ‘Change runtime type’ > ‘Hardware accelerator’ > select GPU)**

In [None]:
#@title Clone repository, and prepare directory for the rest of computations:
!git clone https://github.com/mylonasc/fatigue_cvae.git
!cp -r fatigue_cvae/data .
!cp fatigue_cvae/utils.py .
!pip install tqdm

In [None]:
# Imports
import tensorflow as tf
import tensorflow_probability as tfp
tfd = tfp.distributions
import numpy as np
from utils import BladeCSMeshPlotter, default_mesh_file,DELDatasetPreProc, PlotFatvals
import matplotlib.pyplot as pplot
from tqdm import tqdm

In [None]:
tf.__version__, tfp.__version__


In [None]:
# A conditional VAE (IWAE) model using the functional Keras API
class CVAEModel:
    def __init__(self, n_conditioning, input_dims, n_latent,layer_width = 64, n_layers_enc = 2,n_layers_dec = 2,
                 activation = "relu", n_iwae = 40, enc_use_cond = True):
        """
        A simple FFNN conditional VAE model [1] for tf 2.4
        Implements the importance weighted autoencoder [2] by sampling the latent space multiple times.
        A similar implementation (initially in tensorflow 1.13) was used for [3].
        
        parameters:
          n_conditioning : number of conditioning variables
          input_dims     : number of dimensions of the input
          n_latent       : the size of the latent variable.
          layer_width    : (default: 50) a width value for all layers
          n_layers_enc   : (default: 2) the number of encoder layers (with activations). The part of the
                           encoder that parametrizes the std of the latent is passed through a softplus.
          n_layers_dec   : (default: 2) number of decoder layers
          activation     : ['relu'] what activation to use 
          n_iwae         : [10] number of samples from the latent space passed through the decoder for each pass. 
          
          
        references:
        
          [1] Kingma, Diederik P., and Max Welling. "Auto-encoding variational bayes." 
              arXiv preprint arXiv:1312.6114 (2013).
              
          [2] Burda, Yuri, Roger Grosse, and Ruslan Salakhutdinov. "Importance weighted autoencoders." 
              arXiv preprint arXiv:1509.00519 (2015).
              
          [3] Mylonas, Charilaos, Imad Abdallah, and E. N. Chatzi. "Deep unsupervised learning
              for condition monitoring and prediction of high dimensional data with application 
              on windfarm scada data." Model Validation and Uncertainty Quantification, Volume 3. 
              Springer, Cham, 2020. 189-196.

        """
        all_params = [n_conditioning, input_dims, layer_width, n_layers_dec, n_layers_enc, n_latent,activation, n_iwae]
        pnames = ["n_conditioning", "input_dims", "layer_width", "n_layers_dec", "n_layers_enc", "n_latent","activation","n_iwae"]
        self.cvae_params = {k: v for v,k in zip(all_params,pnames)}
        
        if enc_use_cond:
            input_dims_xx = input_dims  + n_conditioning
        else:
            input_dims_xx = input_dims
        
        self.encoder = CVAEModel.make_encoder(input_dims = input_dims_xx,
                                              layer_width = layer_width,
                                              n_layers = n_layers_enc , 
                                              n_latent= n_latent)
        
        
        prior = tfd.MultivariateNormalDiag(loc = tf.zeros(n_latent), scale_diag=tf.ones(n_latent))
        posterior = tfp.layers.DistributionLambda(make_distribution_fn = lambda t: tfd.MultivariateNormalDiag(loc = t[:,0:n_latent], 
                                                                                                              scale_diag= 1e-5 + tf.nn.softplus(t[:,n_latent:])),
                                                  convert_to_tensor_fn= lambda s : s.sample(n_iwae))
        
        
        X = tf.keras.Input(shape = (input_dims,), name = "X_in")
        W = tf.keras.Input(shape = (n_conditioning,) , name = "W")
        
        if enc_use_cond:
            X_in = tf.keras.layers.concatenate([X,W], axis = -1)
        else:
            X_in = X
        
        vae_params = self.encoder(X_in)
        
        posterior_out = posterior(tf.keras.layers.concatenate(vae_params))
        
        kl_div = tfd.kl_divergence(tfd.MultivariateNormalDiag(loc = vae_params[0], scale_diag=vae_params[1] ), prior)
        self.posterior_out = posterior_out
        
        dec_output = input_dims
        self.decoder = CVAEModel.make_decoder(output_dims = dec_output,
                                      layer_width = layer_width,
                                      conditioning_dims = n_conditioning,
                                      n_layers = n_layers_dec,
                                      n_latent= n_latent,
                                      n_iwae = n_iwae, 
                                      activation = activation,
                                      input_tensor = posterior_out)
        

        decoder_out = self.decoder([posterior_out, W])
        y_decoder_out = decoder_out
        self.vae_model = tf.keras.Model(inputs = [X,W] , outputs = [y_decoder_out, posterior_out, kl_div])
        
        self.posterior_out = posterior_out
        
        self.prior = prior
        self.posterior = posterior
    
    
        
    @staticmethod
    def make_encoder(input_dims = None,layer_width = None, n_layers = None,n_latent = None,activation = None):
        x_in = tf.keras.layers.Input(shape = (input_dims,))

        ln = tf.keras.layers.Dense(layer_width)(x_in)
        for l in range(n_layers - 1):
            ln = tf.keras.layers.Dense(layer_width, activation = activation)(ln)
            ln = tf.keras.layers.Dropout(rate = 0.2)(ln)

        l_out_mean = tf.keras.layers.Dense(n_latent)(ln)
        l_out_std = tf.keras.layers.Dense(n_latent, activation = tf.nn.softplus)(ln)

        return tf.keras.Model(x_in, [l_out_mean, l_out_std])

    @staticmethod
    def make_decoder(output_dims = None,
                     n_latent= None ,
                     layer_width = None, 
                     conditioning_dims = None , 
                     n_layers = None ,
                     activation = None,
                     n_iwae = None,
                    input_tensor = None):
        
        # This is to circumvent a keras bug (see https://github.com/tensorflow/probability/issues/1200)
        dd = tf.identity(input_tensor, name = "z_in")
        
        z_in = tf.keras.layers.Input(tensor = dd, name = "z_in")
        
        w_in = tf.keras.layers.Input(shape = (conditioning_dims, ), name =  "w_in")
        w_in_tiled = tf.tile(w_in[tf.newaxis, ... ] , [n_iwae, 1,1] )
        
        yyin = tf.keras.layers.concatenate([z_in, w_in_tiled])
        
        ln = tf.keras.layers.Dense(layer_width)(yyin)

        for l in range(n_layers - 1):
            ln = tf.keras.layers.Dense(output_dims, activation = activation)(ln)
            ln = tf.keras.layers.Dropout(rate = 0.2)(ln)

        y_out = tf.keras.layers.Dense(output_dims)(ln)
        return tf.keras.Model(inputs=[z_in, w_in], outputs = y_out)


# Load the fatigue dataset
The dataset consists of 1999 simulations of fatigue of the blade root. Please refer back to the paper for more details.

In [None]:
dataset = DELDatasetPreProc()
x_norm = dataset.get_normalized_data_DEL()
w_norm = dataset.get_normalized_data_X()


In [None]:
#@Title some code that is needed for plotting.
plot_blade = BladeCSMeshPlotter(default_mesh_file, dataset.hasval_inds)
mesh = plot_blade.mesh
nl_inds = np.array(mesh.nl_2d[:,0], dtype = np.int)
el_loc = mesh.nl_2d[:,1:3]
elnums = mesh.el_2d[:,0]
node_1 = el_loc[mesh.el_2d[:,1]-1]
node_2 = el_loc[mesh.el_2d[:,2]-1]
node_3 = el_loc[mesh.el_2d[:,3]-1]
node_4 = el_loc[mesh.el_2d[:,4]-1]
node_pos_avg = (node_1 + node_2 + node_3 + node_4 )/4

r0 =( node_pos_avg[:,0]**2 + node_pos_avg[:,1] ** 2 ) **0.5
theta_el =np.arctan2(  node_pos_avg[:,1] , node_pos_avg[:,0]) 

# Filter the nodes that have a value for DEL:
r0 = r0[plot_blade.mesh_finite_mask]
theta_el = theta_el[plot_blade.mesh_finite_mask]

pfv = PlotFatvals(theta_el, r0)


# Training a CVAE on fatigue data

In [None]:
# initialize the optimizer:
opt = tf.keras.optimizers.Adam(learning_rate= 0.001)

In [None]:
## Train-test split:
from sklearn.model_selection import train_test_split
xtrain,xtest, wtrain , wtest = train_test_split(x_norm, w_norm, test_size=0.1)

In [None]:
input_dims = x_norm.shape[-1]
cond_size = w_norm.shape[-1]
nlatent = 30
cvae = CVAEModel( cond_size,input_dims,  nlatent, layer_width=300)

def eval_loss(X,W, beta = 0.1 ):
    Xhat ,z_out, kl_loss = cvae.vae_model([X,W])
    rec_loss = tf.reduce_mean(tf.pow(Xhat-X,2),-1)
    return rec_loss + beta * kl_loss/nlatent , rec_loss, kl_loss, Xhat, z_out

In [None]:
train_losses = []
test_losses = []

In [None]:
#@title A function to anneal the beta parameter
def beta_cyclic_anneal(epoch, beta_0 = 0.01,init_beta_epochs = 10, burnin_beta = 20,
                       beta_max = 0.5, beta_min = 2.,
                       period = 10):
    """
    A funcion for changing the "beta " parameter during training according to a  cyclic annealing scheme, 
    a burn-in perior, and a cyclic annealing. Feel free to experiment with experimenting with it!
    
    
    parameters:
    
      beta_0    : starting value for beta
      init_beta_epochs : number of epochs that beta remains un-changed with beta-0
      burnin_beta      : the epoch that the beta value is (linearly) ramped to, 
                         before the start of the annealing schedule
      beta_max         : the max value for beta
      beta_min         : the min value for beta
      period           : how many epochs to go from min to max beta
    """
    if epoch <init_beta_epochs:
        return beta_0
    
    
    if epoch>=init_beta_epochs and epoch < burnin_beta:
        # ramp-up
        return beta_0 + ((beta_max - beta_0)/(burnin_beta-init_beta_epochs)) * (epoch-init_beta_epochs)
    
    if epoch >= init_beta_epochs:
        # sawtooth cycles - max/min beta:
        db = (beta_max - beta_min)/period
        return beta_max - (epoch - init_beta_epochs)%period * db
        

pplot.plot([beta_cyclic_anneal(ee) for ee in range(100)],'.-')
pplot.title("Beta annealing schedule.")
pplot.grid()

In [None]:
# Training loop:
epochs = 400

for e in tqdm(range(epochs)):
    beta_curr = beta_cyclic_anneal(e)
    with tf.GradientTape() as tape:
        tot_loss,rec_loss, kl_loss,Xhat, z_out = eval_loss(xtrain, wtrain,beta = beta_curr)
        grads = tape.gradient(tot_loss, cvae.vae_model.weights)
        opt.apply_gradients(zip(grads,  cvae.vae_model.weights))
        train_losses.append([np.mean(tot_loss.numpy().flatten()), np.mean(rec_loss), np.mean(kl_loss)])
        

    if e % 50 == 0:
        test_tot_loss, test_rec_loss, test_kl_loss, test_xout, test_zout = eval_loss(xtest, wtest, beta = beta_curr)
        test_losses.append([np.mean(test_tot_loss), np.mean(test_rec_loss), np.mean(test_kl_loss)])
        pplot.subplot(2,1,1)
        pplot.plot(test_xout[0][0::10].numpy(),  xtest[0::10],'.C0')
        pplot.plot(xtest[0::10],xtest[0::10])
        pplot.show()
        
        ii = 5
        xout  = cvae.vae_model([xtest, wtest])
        pplot.plot(tf.reduce_mean(xout[0],axis=0)[ii])
        pplot.plot(xtest[ii])        
        pplot.show()
        pplot.pause(0.1)
        

# Plot CVAE results
The approximate posterior is replaced with the prior. If the KL part of the loss is small and the latent space can be captured with transformations of a spherical Gaussian, the prior should yield realistic samples from the distribution of the MC simulation.

In [None]:
# Evaluate the decoder using the spherical gaussian prior:
wcond = wtest
xvals = xtest

niwae = cvae.cvae_params['n_iwae']
z_prior = cvae.prior.sample(niwae)
xout_from_prior = cvae.decoder([np.tile(z_prior[:,np.newaxis,:], [1,wcond.shape[0],1]), wcond])

# Evaluate the VAE:
xout = cvae.vae_model([xvals,wcond])
def scale_to_plot(F_, mm2 = 0.0957):
    ee = 0.2
    F_ = dataset.unnormalize_DEL(F_)
    F_ = (F_**ee )/mm2
    return F_

F_post = xout[0].numpy()#
F_prior = xout_from_prior.numpy()
Fsc_post = scale_to_plot(F_post)
Fsc_prior = scale_to_plot(F_prior)
Fact = scale_to_plot(xtest)
Fact_train = scale_to_plot(xtrain)

In [None]:
#@title: a function to plot the CVAE samples together with the neighbors from the dataset:
def plot_with_neighbs(wsp, ti, shear, xbounds = 3):
    wunnorm = dataset.unnormalize_X(w_norm)

    fig, ax = pplot.subplots(nrows=2,ncols=2, figsize = (20,10))
    gs = ax[0,0].get_gridspec()
    for ax_ in ax[:,0]:
        ax_.remove()
    axleft = fig.add_subplot(gs[:,0])

    vv_unnorm = np.array([[ti,wsp,shear]])

    vv = (vv_unnorm-dataset.data_input_mean) / dataset.data_input_std
    w_unnorm = dataset.data_input

    # Compute 10 samples from the CVAE using the prior and plot:
    niwae = cvae.cvae_params['n_iwae']
    z_prior = cvae.prior.sample(niwae)

    wcond = np.array([])
    xout_from_prior = cvae.decoder([np.tile(z_prior[:,np.newaxis,:], [1,vv.shape[0],1]), vv])
    xout_from_prior
    Fsc_prior_ = scale_to_plot(xout_from_prior)
    for k in range(niwae):
        pfv.make_plot(Fsc_prior_[k]*500000, 0, opacity=0.1, color="C0",ax = axleft, tight = False)
    
    axleft.set_title("Fatigue estimates\n ti/wsp/se: %2.1f/%2.2f/%2.1f"%(wsp, ti, shear), fontsize = 20)
    axleft.set_xlim([-xbounds, xbounds])
    axleft.set_ylim([-xbounds, xbounds])
    axleft.grid()
    # pfv.make_plot(Fact*500000, ii, opacity=0.5, color="C1", ax = ax_curr, tight = False)
    # Find nearest neighbors from training set:
    nneighbors = 2
    inds_closest = np.argsort(np.sum(np.square(vv_unnorm-w_unnorm)*[100.,10.,1],1))[0:nneighbors]
    # for kk in range
    for n in inds_closest:
        pfv.make_plot(scale_to_plot(x_norm[[n]])*500000, 0 , opacity=0.9, color="C1",ax = axleft, tight = False)

    # plot nearest neighbor:

    ax[0,1].plot(w_unnorm[:,0], w_unnorm[:,1],'.', label = "all data")
    ax[0,1].plot(w_unnorm[inds_closest,0], w_unnorm[inds_closest,1],'*', markersize = 15, label = "neighbors")
    ax[0,1].plot(vv_unnorm[0,0], vv_unnorm[0,1],'*r', markersize = 15, label = "CVAE conditioning")
    ax[0,1].set_xlabel("Ti[pct]"); ax[0,1].set_ylabel("Wsp[m/s]")
    ax[0,1].legend(fontsize = 15)

    ax[1,1].plot(w_unnorm[:,1], w_unnorm[:,2],'.')
    ax[1,1].plot(w_unnorm[inds_closest,1], w_unnorm[inds_closest,2],'*',markersize = 15)
    ax[1,1].plot(vv_unnorm[0,1], vv_unnorm[0,2],'*r', markersize = 15, label  = "CVAE conditioning")

    ax[1,1].set_xlabel("Wsp[m/s]"); ax[1,1].set_ylabel("shear exp")
    ax[0,1].grid(True)
    ax[1,1].grid(True)
    return fig

In [None]:
from ipywidgets import FloatSlider, interact

xbounds = 3

figidx = 0
@interact(wsp = FloatSlider(min = 0,max = 20, value = 5.90),
          ti  = FloatSlider(min = 0, max = 0.4, value = 0.16, step = 0.4/20),
          shear = FloatSlider(min =-1.64, max = 2.15, value = 0))
def plot_with_inputs(wsp, ti, shear):
    plot_with_neighbs(wsp, ti, shear)



In [None]:
#@title Run the following for creating an animation


#plot_with_neighbs
ngridpoints = 20
ti0 = 0.05
tigrid = np.linspace(ti0,0.15,ngridpoints)
wsp0 = 5
wspgrid = np.linspace(wsp0,20,ngridpoints)

frame_idx = 0
for kk in range(ngridpoints):
    ti = tigrid[kk]
    ff = plot_with_neighbs(wsp0, ti, 0.)
    ff.savefig("anim_%03i.jpg"%frame_idx)
    frame_idx += 1

for kk in reversed(range(ngridpoints)):
    ti = tigrid[kk]
    ff = plot_with_neighbs(wsp0, ti, 0.)
    ff.savefig("anim_%03i.jpg"%frame_idx)
    frame_idx += 1

    
for kk in range(ngridpoints):
    ff = plot_with_neighbs(wspgrid[kk], ti0, 0.)
    ff.savefig("anim_%03i.jpg"%frame_idx)
    frame_idx += 1

for kk in reversed(range(ngridpoints)):
    ff = plot_with_neighbs(wspgrid[kk], ti0, 0.)
    ff.savefig("anim_%03i.jpg"%frame_idx)
    frame_idx += 1
