# Variational Autoencoder

In this notebook we implement a basic test of an Variational Autoencoder (VAE). The variational autoencoder is used to learn the distribution of yield curves.

The notebook is structured as follows:

  - Generate input yield curves from a Hull White model.
  - Setup a VAE based on the [TensorFlow tutorial](https://www.tensorflow.org/tutorials/generative/cvae)..
  - Train the VAE to the Hull White yield curves and test the model.
  - Save and plot the model.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from tqdm.keras import TqdmCallback

import tensorflow as tf


## Hull White Yield Curves

We model yield curves in terms of *continuous compounded zero rates*. A zero rate yield curve is a function $z:[0,\infty)\times[0,\infty) \rightarrow \mathbb{R}$. For a given observation time $t\geq 0$ and maturity time $T\geq t$ the zero rate $z(t,T)$ gives a zero coupon bond price (or discount factor) $P(t,T)$ via the relation
$$
  P(t,T) = e^{-z(t,T)(T-t)}.
$$
Equivalently, we can calculate the zero rate from a zero coupon bond price as
$$
  z(t,T) = - \frac{\log\left( P(t,T) \right)}{T-t}.
$$

In Hull White model the zero bond prices can be reconstructed from a Gaussian state variable $x_t$ and
$$
  P(t,T) = \frac{P(0,T)}{P(0,t)} e^{-G(t,T)x_t - \frac{1}{2}G(t,T)^2y(t)}.
$$
Here, $G(t,T)$ and $y(t)$ are model functions given as
$$
  G(t,T) = \frac{1}{a}\left[1 - e^{-a(T-t)}\right]
$$
and
$$
  y(t) = \int_0^t \left[e^{-a(t-u)} \sigma(u) \right]^2 du =  \frac{1}{2a}\left[1 - e^{-2at}\right]\sigma^2.
$$
Model parameters are mean reversion $a$ and short rate volatility $\sigma(t)=\sigma$.

Consequently, yield curves can be represented as
$$
  z(t,T) = \frac{G(t,T)}{T-t} x_t + \frac{1}{2} \frac{G(t,T)^2y(t)}{T-t} + \left[ z(0,T) - z(0,t) \right].
$$

In [None]:
class HullWhiteModel:

    def __init__(self, mean_reversion, volatility, zeroYieldCurve=None):
        self.mean_reversion = mean_reversion
        self.volatility = volatility
        self.zeroYieldCurve = zeroYieldCurve

    def G(self, t,T): 
        return (1 - np.exp(-self.mean_reversion*(T-t))) / self.mean_reversion
    
    def y(self, t):
        return self.volatility**2 * (1 - np.exp(-2*self.mean_reversion*(t))) / 2 / self.mean_reversion

    def zeroRate(self, x, t, T):
        G = self.G(t,T)
        z = G / (T-t) * x + 0.5 * G**2 * self.y(t) / (T-t)
        if self.zeroYieldCurve is not None:
            z += self.zeroYieldCurve(T) - self.zeroYieldCurve(t)
        return z

    def yieldCurves(self, t, delta_T, num_samples):
        """
        Simulate yield curves from 0 to t using the model parameters.

        State variables are simulated in t-forward measure.

        Arguments:

        t        ... future observation time
        delta_T  ... array of time offsets to calculate z(t, t + delta_T)
        num_samples  ... number of yield curve samples calculated

        Returns:

        A an array of shape (num_samples, len(delta_T)) containing
        simulated zero rates z(t, t + delta_T).
        """
        x = np.random.normal(size=(num_samples,1)) * np.sqrt(self.y(t))
        return self.zeroRate(x, t, t + delta_T)


We define a utility function to consistently plot curves.

In [None]:
def plot_yieldCurves(curves, N=10):
    plt.Figure(figsize=(6,4))
    N = np.minimum(N, curves.shape[0])
    for yc in curves[:N]:
        plt.plot(delta_T, yc)
    plt.xlabel('maturity times $T-t$')
    plt.ylabel('zero rate $z(t,T)$')
    plt.title('simulated curves for $t=%.1f$ ($a=%.1f$%%, $\sigma=%.1f$bp)' % (t, model.mean_reversion*1e2, model.volatility*1e4))
    plt.show()
    #
    print('Shape: ' + str(curves.shape))

We set up a simple Hull White model and simulate future yield curves.

In [None]:
model = HullWhiteModel(0.15, 0.0075)  # 15% mean reversion (fairly high) and 75bp vol
t = 10.0  # horizon in 10y
delta_T = np.array([1.0/365, 0.5, 1.0, 2.0, 3.0, 5.0, 7.0, 10.0, 15.0, 20.0])  # a typical curve grid 

num_samples = 2**10

yieldCurves = model.yieldCurves(10.0, delta_T, num_samples)

plot_yieldCurves(yieldCurves)

## Variational Autoencoder using Keras

We setup a VAE implementation using Keras, see [TensorFlow tutorial](https://www.tensorflow.org/tutorials/generative/cvae).

In [None]:
class VariationalAutoencoder(tf.keras.Model):
    """A variational autoencoder as a Keras model."""

    def __init__(self, input_dim, hidden_dim, latent_dim, alpha=0.01):
        super().__init__()
        self.input_dim  = input_dim   # number of inputs and outputs flattened as vector 
        self.hidden_dim = hidden_dim  # number of hidden nodes
        self.latent_dim = latent_dim  # number of latent variables, i.e. dimensionality of latent space
        self.alpha      = alpha       # convex combination of to minimize reconstruction (0) or latent distribution (1)
        #
        lrelu = tf.keras.layers.LeakyReLU(alpha=0.3)  # functor for activation function
        #
        self.encoder = tf.keras.Sequential( [
            tf.keras.layers.InputLayer(input_shape=(self.input_dim)),
            tf.keras.layers.Dense(self.hidden_dim, activation=lrelu),
            tf.keras.layers.Dense(2 * self.latent_dim, activation=lrelu),  # mu, logvar
        ] )
        self.decoder = tf.keras.Sequential( [
            tf.keras.layers.InputLayer(input_shape=(self.latent_dim)),
            tf.keras.layers.Dense(self.hidden_dim, activation=lrelu),
            tf.keras.layers.Dense(self.input_dim, activation=tf.keras.activations.linear),
        ] )

    def encode(self, x):
        mean, logvar = tf.split(self.encoder(x), num_or_size_splits=2, axis=1)
        return mean, logvar
  
    def reparameterize(self, mean, logvar):
        eps = tf.random.normal(shape=tf.shape(mean))
        return eps * tf.exp(logvar * .5) + mean
  
    def decode(self, z):
        return self.decoder(z)

    def call(self, inputs):
        """
        Specify model output calculation for training.

        This function is overloaded from tf.keras.Model.
        """
        mean, logvar = self.encode(inputs)
        z = self.reparameterize(mean, logvar)
        x_out = self.decode(z)
        return tf.concat([x_out, mean, logvar], axis=1)

    def lossfunction(self, y_true, y_pred, sample_weight=None):
        """
        Specify the objective function for optimisation.

        This function is input to tf.keras.Model.compile(...)
        """
        y = tf.cast(y_true, tf.float32)
        x_out  = y_pred[:, : -2*self.latent_dim                  ]
        mean   = y_pred[:, -2*self.latent_dim : -self.latent_dim ]
        logvar = y_pred[:, -self.latent_dim :                    ]
        #
        decoded_loss = tf.reduce_sum(tf.math.squared_difference(x_out, y), 1)
        latent_loss = 0.5 * tf.reduce_sum(tf.exp(logvar) + tf.square(mean)  - 1. - logvar, 1)  # https://en.wikipedia.org/wiki/Kullback%E2%80%93Leibler_divergence#Multivariate_normal_distributions
        loss = tf.reduce_mean((1 - self.alpha) * decoded_loss + self.alpha * latent_loss)
        return loss

    def sample(self, n_samples = 10, randoms=None):
        """
        Calculate a sample of observations from the model.
        """
        if randoms is None: # we do need to sample
            randoms = tf.random.normal(shape=(n_samples, self.latent_dim))
        return self.decode(randoms)

    def functional_model(self):
        """
        Return a standard tf.keras.Model via Functional API.

        The resulting model can be used to plot the architecture.
        """
        x = tf.keras.Input(shape=(self.input_dim))
        return tf.keras.Model(inputs=[x], outputs=self.call(x))


## Model Taining and Testing

Now, we can setup a model.

In [None]:
vae_model = VariationalAutoencoder(input_dim=yieldCurves.shape[1], hidden_dim=yieldCurves.shape[1], latent_dim=1, alpha=0.5*1e-4)
optimizer = tf.keras.optimizers.Adam(learning_rate=0.005)
vae_model.compile(optimizer=optimizer, loss=vae_model.lossfunction)

The model is trained using the curves generated from the (analytic) Hull White model.

In [None]:
vae_model.fit(x=yieldCurves, y=yieldCurves, epochs=100, callbacks=[TqdmCallback(verbose=0)], verbose=0)
yieldCurves_vae2 = vae_model.sample(10)
plot_yieldCurves(yieldCurves_vae2)

## Save and Plot a Model

In this section we explore functionality to save and plot Keras models.

In [None]:
model_folder_name = 'HullWhiteCurveVae'

vae_model.save(model_folder_name)
#
reconstructed_model = tf.keras.models.load_model(model_folder_name,
    custom_objects={
        'VariationalAutoencoder': VariationalAutoencoder,
        'lossfunction' : VariationalAutoencoder.lossfunction,
    }
)

We loose the custom functions like *VariationalAutoencoder.sample(...)* in the reconstructed model. Nevertheless, we can still access the attributes. And the *decoder* is all we need to generate samples.

In [None]:
def sample(model, latent_dim, n_samples = 10, randoms=None):
    """
    Calculate a sample of observations from the model.
    """
    if randoms is None: # we do need to sample
        randoms = tf.random.normal(shape=(n_samples, latent_dim))
    return model.decoder(randoms)

plot_yieldCurves(sample(reconstructed_model, 1, 10))

In [None]:
tf.keras.utils.plot_model(
    vae_model.functional_model(),
    to_file="model.png",
    show_shapes=True,
    show_dtype=False,
    show_layer_names=False,
    rankdir="TB",
    expand_nested=False,
    dpi=96,
    layer_range=None,
    show_layer_activations=False,
)

## Conditional VAE

We extend the VAE by adding external conditions. This follows the ideas presented in [GitHub:MarketSimulator](https://github.com/imanolperez/market_simulator).

In our yield curve example the external condition is *time-to-maturity*. That is, instead of a yield curve as vector, we now learn a yield curve functions.

In [None]:
class ConditionalVariationalAutoencoder(tf.keras.Model):
    """Conditional variational autoencoder."""

    def __init__(self, input_dim, hidden_dim, latent_dim, output_dim, alpha=0.01):
        super().__init__()
        self.input_dim  = input_dim
        self.hidden_dim = hidden_dim
        self.latent_dim = latent_dim
        self.output_dim = output_dim
        self.alpha      = alpha
        #
        lrelu = tf.keras.layers.LeakyReLU(alpha=0.3)  # functor for activation function
        #
        self.encoder = tf.keras.Sequential( [
            tf.keras.layers.InputLayer(input_shape=(self.input_dim)),
            tf.keras.layers.Dense(self.hidden_dim, activation=lrelu),
            tf.keras.layers.Dense(2 * self.latent_dim, activation=lrelu),  # mu, logvar
        ] )
        self.decoder = tf.keras.Sequential( [
            tf.keras.layers.InputLayer(input_shape=(self.latent_dim + self.input_dim - self.output_dim)),
            tf.keras.layers.Dense(self.hidden_dim, activation=lrelu),
            tf.keras.layers.Dense(self.output_dim, activation=tf.keras.activations.linear),
        ] )

    def encode(self, x, c):
        x = tf.concat([x, c], axis=1)
        mean, logvar = tf.split(self.encoder(x), num_or_size_splits=2, axis=1)
        return mean, logvar
  
    def reparameterize(self, mean, logvar):
        eps = tf.random.normal(shape=tf.shape(mean))
        return eps * tf.exp(logvar * .5) + mean
  
    def decode(self, z, c):
        z = tf.concat([z, c], axis=1)
        return self.decoder(z)

    def call(self, inputs, training=False):
        assert isinstance(inputs, (list, tuple))
        assert len(inputs)==2
        x = inputs[0]
        c = inputs[1]
        mean, logvar = self.encode(x, c)
        z = self.reparameterize(mean, logvar)
        x_out = self.decode(z, c)
        return tf.concat([x_out, mean, logvar], axis=1)

    def lossfunction(self, y_true, y_pred, sample_weight=None):
        y = tf.cast(y_true, tf.float32)
        x_out  = y_pred[:, : -2*self.latent_dim                  ]
        mean   = y_pred[:, -2*self.latent_dim : -self.latent_dim ]
        logvar = y_pred[:, -self.latent_dim :                    ]
        #
        decoded_loss = tf.reduce_sum(tf.math.squared_difference(x_out, y), 1)
        latent_loss = 0.5 * tf.reduce_sum(tf.exp(logvar) + tf.square(mean)  - 1. - logvar, 1)  # https://en.wikipedia.org/wiki/Kullback%E2%80%93Leibler_divergence#Multivariate_normal_distributions
        loss = tf.reduce_mean((1 - self.alpha) * decoded_loss + self.alpha * latent_loss)
        return loss

    def sample(self, n_samples, c, randoms=None):
        if randoms is None: # we do need to sample
            randoms = tf.random.normal(shape=(n_samples, self.latent_dim))
        # we need the Cartesian product of randoms and conditions
        z_full = tf.concat([randoms for row in c], axis=0)
        zero = np.zeros(shape=(randoms.shape[0], c.shape[1]))
        c_full = tf.concat([ tf.cast(zero+row, tf.float32) for row in c], axis=0)
        dec_outputs = self.decode(z_full, c_full)
        #return tf.reshape(dec_outputs, shape=(randoms.shape[0],c.shape[0]))
        return \
            tf.transpose(tf.reshape(dec_outputs, shape=(c.shape[0],randoms.shape[0]))), \
            tf.transpose(tf.reshape(c_full,      shape=(c.shape[0],randoms.shape[0])))
    


For each element of your yield curves we specify the time-to-maturity.

In [None]:
condition = np.zeros(shape=yieldCurves.shape) + delta_T
condition[:2,:]

Our VAE accepts inputs as vectors. Since we want to model individual yield curve values, we need to flatten curve values and time-to-maturity values.

In [None]:
x = yieldCurves.flatten()
x.shape = (x.shape[0], 1)
print(x.shape)
c = condition.flatten()
c.shape = (c.shape[0], 1)
print(c.shape)

Our VAE model has two inputs: curve value and time-to-maturity. As output we only have one quantity: curve value. We also put a lot of emphasis on re-constructions. Thus $\alpha$ is very small.

In [None]:
cvae_model = ConditionalVariationalAutoencoder(input_dim=2, hidden_dim=yieldCurves.shape[1], latent_dim=1, output_dim=1, alpha=0.5*1e-4)
optimizer = tf.keras.optimizers.Adam(learning_rate=0.005)
cvae_model.compile(optimizer=optimizer, loss=vae_model.lossfunction)

For sample calculation we need to supply the condition (i.e. time-to-maturity) as a row of the condition matrix.

In [None]:
cvae_model.fit(x=(x,c), y=x, epochs=1000, callbacks=[TqdmCallback(verbose=0)], verbose=0)
#
cond = np.reshape(delta_T, (delta_T.shape[0],1))
yieldCurves_cvae, delta_T_s = cvae_model.sample(8, c=cond)
plot_yieldCurves(yieldCurves_cvae)

It seems, the fit is not as good as when we learn the full shape of the curves.

We also verify that the data transformations worked out by inspecting the equally re-shaped condition.

In [None]:
delta_T_s[1]