<a id=top-page></a>

# `pykitPIV` demo: Train a convolutional variational autoencoder

In this Jupyter notebook, we show how the available functionalities from the machine learning module (`pykitPIV.ml`) can be used to train a convolutional variational autoencoder (CVAE). The trained CVAE model generates new velocity fields ($u$ and $v$ components) that belong to the distribution of some experimental data. Hence, this approach can be used to extend the training data for transfer learning and can help adapt a machine learning model to the changing experimental conditions.

This tutorial was built from: https://www.tensorflow.org/tutorials/generative/cvae.

<div class="alert alert-block alert-info" style="margin-top: 20px">
<font size="3"><strong>Table of contents:</strong></font>
<br>
<ol>
    <li><a href="#training-data">Create/load training data</a></li>
</ol>
<p></p>
</div>
<br>

***

In [1]:
import numpy as np
import h5py
import tensorflow as tf
import matplotlib.pyplot as plt
import cmcrameri.cm as cmc
from pykitPIV import PIVDatasetPyTorch
from IPython import display
import glob
import imageio
import PIL
import time
from pykitPIV import FlowField, Image, PIVCVAE

2025-05-21 13:42:31.385861: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
save_images = False
filename = None

In [3]:
cmap = cmc.oslo_r

<a id=training-data></a>

***

## Create/load training data

[Go to the top](#top-page)

Below, we create dummy training data (displacement fields), which can be replaced by data coming from an experiment.

In [None]:
n_images = 10000
size = (28, 28)

In [None]:
flowfield = FlowField(n_images=n_images,
                      size=size,
                      size_buffer=0,
                      time_separation=1,
                      dtype=np.float32,
                      random_seed=100)

In [None]:
tic = time.perf_counter()

flowfield.generate_random_velocity_field(displacement=(1, 1),
                                         gaussian_filters=(5, 10),
                                         n_gaussian_filter_iter=3)

toc = time.perf_counter()

print(f'Total time: {(toc - tic)/60:0.1f} minutes.')

Inspect the training displacement field components:

In [None]:
image = Image()

In [None]:
image.add_flowfield(flowfield)

In [None]:
image.plot_field(4,
                 field='velocity',
                 with_buffer=True,
                 xlabel='Width [px]',
                 ylabel='Height [px]',
                 title=('$dx$', '$dy$'),
                 cmap=cmc.oslo_r,
                 cbar=True,
                 origin='lower',
                 figsize=(3,3),
                 filename=None);

In [None]:
image.plot_field_magnitude(7,
                 field='velocity',
                 with_buffer=True,
                 xlabel='Width [px]',
                 ylabel='Height [px]',
                 cmap=cmc.oslo_r,
                 cbar=True,
                 figsize=(3,3),
                 filename=None);

### Prepare training data for TensorFlow

In [None]:
split_at = 9000

In [None]:
vector_field = flowfield.velocity_field

for i in range(0, n_images):

    vector_field[i, 0, :, :] = vector_field[i, 0, :, :] - np.min(vector_field[i, 0, :, :])
    vector_field[i, 0, :, :] = vector_field[i, 0, :, :] / np.max(vector_field[i, 0, :, :])
    
    vector_field[i, 1, :, :] = vector_field[i, 1, :, :] - np.min(vector_field[i, 1, :, :])
    vector_field[i, 1, :, :] = vector_field[i, 1, :, :] / np.max(vector_field[i, 1, :, :])

In [None]:
train_images = vector_field[0:split_at,:,:,:]
test_images = vector_field[split_at::,:,:,:]

train_images = np.transpose(train_images, (0, 2, 3, 1))
test_images = np.transpose(test_images, (0, 2, 3, 1))

In [None]:
train_images.shape

In [None]:
test_images.shape

In [None]:
train_size = train_images.shape[0]
batch_size = 32
test_size = test_images.shape[0]

In [None]:
train_dataset = (tf.data.Dataset.from_tensor_slices(train_images).shuffle(train_size).batch(batch_size))
test_dataset = (tf.data.Dataset.from_tensor_slices(test_images).shuffle(test_size).batch(batch_size))

In [None]:
train_dataset

***

## Parameters of the convolutional variational autoencoder

In [None]:
n_channels = 2

In [None]:
input_shape = (size[0], size[1], n_channels)

***

## Train the variational autoencoder

In [None]:
optimizer = tf.keras.optimizers.Adam(1e-4)


def log_normal_pdf(sample, mean, logvar, raxis=1):
  log2pi = tf.math.log(2. * np.pi)
  return tf.reduce_sum(
      -.5 * ((sample - mean) ** 2. * tf.exp(-logvar) + logvar + log2pi),
      axis=raxis)


def compute_loss(model, x):
  mean, logvar = model.encode(x)
  z = model.reparameterize(mean, logvar)
  x_logit = model.decode(z)
  cross_ent = tf.nn.sigmoid_cross_entropy_with_logits(logits=x_logit, labels=x)
  logpx_z = -tf.reduce_sum(cross_ent, axis=[1, 2, 3])
  logpz = log_normal_pdf(z, 0., 0.)
  logqz_x = log_normal_pdf(z, mean, logvar)
  return -tf.reduce_mean(logpx_z + logpz - logqz_x)


@tf.function
def train_step(model, x, optimizer):
  """Executes one training step and returns the loss.

  This function computes the loss and gradients, and uses the latter to
  update the model's parameters.
  """
  with tf.GradientTape() as tape:
    loss = compute_loss(model, x)
  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

In [None]:
epochs = 10

# set the dimensionality of the latent space to a plane for visualization later
latent_dim = 2

num_examples_to_generate = 16

# keeping the random vector constant for generation (prediction) so
# it will be easier to see the improvement.
random_vector_for_generation = tf.random.normal(shape=[num_examples_to_generate, latent_dim])

Initialize the convolutional autoencoder as a `pykitPIV.ml.PIVCVAE` object:

In [None]:
model = PIVCVAE(input_shape=input_shape, 
                latent_dimension=latent_dim)

In [None]:
def generate_and_save_images(model, epoch, test_sample):
    
  mean, logvar = model.encode(test_sample)
  z = model.reparameterize(mean, logvar)
  predictions = model.sample(z)
  fig = plt.figure(figsize=(4, 4))

  for i in range(predictions.shape[0]):
      
    plt.subplot(4, 4, i + 1)
    plt.imshow(predictions[i, :, :, 0], cmap=cmap)
    plt.axis('off')

  # tight_layout minimizes the overlap between 2 sub-plots
  plt.savefig('image_at_epoch_{:04d}.png'.format(epoch))
  plt.show()

In [None]:
# Pick a sample of the test set for generating output images
assert batch_size >= num_examples_to_generate
for test_batch in test_dataset.take(1):
  test_sample = test_batch[0:num_examples_to_generate, :, :, :]

In [None]:
generate_and_save_images(model, 0, test_sample)

for epoch in range(1, epochs + 1):
    

    start_time = time.time()
    
    for train_x in train_dataset:
        train_step(model, train_x, optimizer)
    end_time = time.time()

    loss = tf.keras.metrics.Mean()
    
    for test_x in test_dataset:
        loss(compute_loss(model, test_x))
      
    elbo = -loss.result()
    
    display.clear_output(wait=False)
    
    print('Epoch: {}, Test set ELBO: {}, time elapse for current epoch: {}'.format(epoch, elbo, end_time - start_time))
    
    generate_and_save_images(model, epoch, test_sample)

In [None]:
def display_image(epoch_no):
  return PIL.Image.open('image_at_epoch_{:04d}.png'.format(epoch_no))

In [None]:
plt.imshow(display_image(epoch))
plt.axis('off')

In [None]:
anim_file = 'cvae.gif'

with imageio.get_writer(anim_file, mode='I') as writer:
  filenames = glob.glob('image*.png')
  filenames = sorted(filenames)
  for filename in filenames:
    image_for_gif = imageio.imread(filename)
    writer.append_data(image_for_gif)
  image_for_gif = imageio.imread(filename)
  writer.append_data(image_for_gif)

***

© K. Zdybał, C. Mucignat, S. Kunz, I. Lunati (2025)