In [42]:
import kfp
from kfp.components import func_to_container_op, InputBinaryFile, OutputBinaryFile, OutputPath

In [17]:
def task(*args, **kwargs):
    def decorator(f):
        return func_to_container_op(f, *args, **kwargs)
    return decorator

In [30]:
@task(base_image='gcr.io/deeplearning-platform-release/tf2-cpu.2-4')
def load_dataset(
    train_images_file: OutputBinaryFile(),
    test_images_file: OutputBinaryFile()):
    
  import numpy as np
  import tensorflow as tf
    
  (train_images, _), (test_images, _) = tf.keras.datasets.mnist.load_data()
  np.save(train_images_file, train_images)
  np.save(test_images_file, test_images)

In [39]:
@task(base_image='gcr.io/deeplearning-platform-release/tf2-cpu.2-4')
def preprocess_images(input_images_file: InputBinaryFile(), output_images_file: OutputBinaryFile()):
  import numpy as np
  images = np.load(input_images_file)
  images = images.reshape((images.shape[0], 28, 28, 1)) / 255.
  images = np.where(images > .5, 1.0, 0.0).astype('float32')
  np.save(output_images_file, images)

In [43]:
@task(base_image='gcr.io/deeplearning-platform-release/tf2-cpu.2-4')
def train(
    epochs: int, 
    latent_dim: int, 
    train_size: int,
    test_size: int,
    batch_size: int,
    train_images_file: InputBinaryFile(), 
    test_images_file: InputBinaryFile(), 
    output_path: OutputPath()):
  
  import json
  import numpy as np
  import tensorflow as tf
  import time

  class CVAE(tf.keras.Model):
    """Convolutional variational autoencoder."""

    def __init__(self, latent_dim):
      super(CVAE, self).__init__()
      self.latent_dim = latent_dim
      self.encoder = tf.keras.Sequential(
          [
              tf.keras.layers.InputLayer(input_shape=(28, 28, 1)),
              tf.keras.layers.Conv2D(
                  filters=32, kernel_size=3, strides=(2, 2), activation='relu'),
              tf.keras.layers.Conv2D(
                  filters=64, kernel_size=3, strides=(2, 2), activation='relu'),
              tf.keras.layers.Flatten(),
              # No activation
              tf.keras.layers.Dense(latent_dim + latent_dim),
          ]
      )

      self.decoder = tf.keras.Sequential(
          [
              tf.keras.layers.InputLayer(input_shape=(latent_dim,)),
              tf.keras.layers.Dense(units=7*7*32, activation=tf.nn.relu),
              tf.keras.layers.Reshape(target_shape=(7, 7, 32)),
              tf.keras.layers.Conv2DTranspose(
                  filters=64, kernel_size=3, strides=2, padding='same',
                  activation='relu'),
              tf.keras.layers.Conv2DTranspose(
                  filters=32, kernel_size=3, strides=2, padding='same',
                  activation='relu'),
              # No activation
              tf.keras.layers.Conv2DTranspose(
                  filters=1, kernel_size=3, strides=1, padding='same'),
          ]
      )

    @tf.function
    def sample(self, eps=None):
      if eps is None:
        eps = tf.random.normal(shape=(100, self.latent_dim))
      return self.decode(eps, apply_sigmoid=True)

    def encode(self, x):
      mean, logvar = tf.split(self.encoder(x), num_or_size_splits=2, axis=1)
      return mean, logvar

    def reparameterize(self, mean, logvar):
      eps = tf.random.normal(shape=mean.shape)
      return eps * tf.exp(logvar * .5) + mean

    def decode(self, z, apply_sigmoid=False):
      logits = self.decoder(z)
      if apply_sigmoid:
        probs = tf.sigmoid(logits)
        return probs
      return logits


  def log_normal_pdf(sample, mean, logvar, raxis=1):
    log2pi = tf.math.log(2. * np.pi)
    return tf.reduce_sum(
        -.5 * ((sample - mean) ** 2. * tf.exp(-logvar) + logvar + log2pi),
        axis=raxis)


  def compute_loss(model, x):
    mean, logvar = model.encode(x)
    z = model.reparameterize(mean, logvar)
    x_logit = model.decode(z)
    cross_ent = tf.nn.sigmoid_cross_entropy_with_logits(logits=x_logit, labels=x)
    logpx_z = -tf.reduce_sum(cross_ent, axis=[1, 2, 3])
    logpz = log_normal_pdf(z, 0., 0.)
    logqz_x = log_normal_pdf(z, mean, logvar)
    return -tf.reduce_mean(logpx_z + logpz - logqz_x)


  @tf.function
  def train_step(model, x, optimizer):
    """Executes one training step and returns the loss.

    This function computes the loss and gradients, and uses the latter to
    update the model's parameters.
    """
    with tf.GradientTape() as tape:
      loss = compute_loss(model, x)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_images = np.load(train_images_file)
  train_dataset = (tf.data.Dataset.from_tensor_slices(train_images)
                  .shuffle(train_size).batch(batch_size))

  test_images = np.load(test_images_file)
  test_dataset = (tf.data.Dataset.from_tensor_slices(test_images)
                  .shuffle(test_size).batch(batch_size))

  optimizer = tf.keras.optimizers.Adam(1e-4)
  model = CVAE(latent_dim)

  for epoch in range(1, epochs + 1):
    start_time = time.time()
    for train_x in train_dataset:
      train_step(model, train_x, optimizer)
    end_time = time.time()

    loss = tf.keras.metrics.Mean()
    for test_x in test_dataset:
      loss(compute_loss(model, test_x))
    elbo = -loss.result()
    model.encoder.save(f'{output_path}/{epoch}.encoder')
    model.decoder.save(f'{output_path}/{epoch}.decoder')
    metrics = {'elbo': float(elbo), 'elapsed': end_time - start_time}
    json.dump(metrics, open(f'{output_path}/{epoch}.metrics.json', 'w'))
    print('Epoch: {}, Test set ELBO: {}, time elapse for current epoch: {}'
          .format(epoch, elbo, end_time - start_time))

In [47]:
@kfp.dsl.pipeline(name='digit-generator')
def pipeline(epochs: int):
    load_dataset_task = load_dataset()
    train_images = load_dataset_task.outputs['train_images']
    test_images = load_dataset_task.outputs['test_images']
    train_images = preprocess_images(train_images).output
    test_images = preprocess_images(test_images).output
    train_task = train(
        epochs=epochs, 
        latent_dim=2, 
        train_size=60000,
        test_size=10000,
        batch_size=32,
        train_images=train_images,
        test_images=test_images)
    train_task.add_node_selector_constraint('cloud.google.com/gke-accelerator', 'nvidia-tesla-p100')
    train_task.set_gpu_limit(1)
    
pipeline_arguments = dict(
    epochs=10
)

kfp_client = kfp.Client(host='https://1c2f27faaa42f41a-dot-us-east1.pipelines.googleusercontent.com/')
run = kfp_client.create_run_from_pipeline_func(
    pipeline, 
    experiment_name='digit-generator',
    arguments=pipeline_arguments)

In [4]:
def save_test_sample(test_images_path, sample_size, output_path):
  import numpy as np

  test_images = np.load(test_images_path)
  np.random.shuffle(test_images)
  test_sample = test_images[:sample_size]
  np.save(open(output_path, 'wb'), test_sample)

In [5]:
def generate_image(
    test_sample_path,
    model_path,
    output_path):

  import numpy as np
  import tensorflow as tf 

  x = np.load(test_sample_path)
  encoder = tf.keras.models.load_model(f'{model_path}/encoder')
  decoder = tf.keras.models.load_model(f'{model_path}/decoder')
  mean, logvar = tf.split(encoder(x), num_or_size_splits=2, axis=1)
  z = tf.random.normal(shape=mean.shape) * tf.exp(logvar * .5) + mean
  logits = decoder(z)
  predictions = tf.sigmoid(logits)
  fig = plt.figure(figsize=(4, 4))

  for i in range(predictions.shape[0]):
    plt.subplot(4, 4, i + 1)
    plt.imshow(predictions[i, :, :, 0], cmap='gray')
    plt.axis('off')
    plt.savefig(f'{output_path}')

In [6]:
def create_animation(filenames, output_file):
  import imageio

  with imageio.get_writer(output_file, mode='I') as writer:
    for filename in filenames:
      image = imageio.imread(filename)
      writer.append_data(image)

In [7]:
def plot_latent_images(model_path, n, digit_size, output_path):
  import matplotlib.pyplot as plt
  import numpy as np
  import tensorflow as tf
  import tensorflow_probability as tfp

  decoder = tf.keras.models.load_model(f'{model_path}/decoder')

  norm = tfp.distributions.Normal(0, 1)
  grid_x = norm.quantile(np.linspace(0.05, 0.95, n))
  grid_y = norm.quantile(np.linspace(0.05, 0.95, n))
  image_width = digit_size*n
  image_height = image_width
  image = np.zeros((image_height, image_width))

  for i, yi in enumerate(grid_x):
    for j, xi in enumerate(grid_y):
      z = np.array([[xi, yi]])
      logits = decoder(z)
      x_decoded = tf.sigmoid(logits)
      digit = tf.reshape(x_decoded[0], (digit_size, digit_size))
      image[i * digit_size: (i + 1) * digit_size,
            j * digit_size: (j + 1) * digit_size] = digit.numpy()

  plt.figure(figsize=(10, 10))
  plt.imshow(image, cmap='Greys_r')
  plt.axis('Off')
  plt.savefig(f'{output_path}')

In [9]:
!mkdir -p data
!mkdir -p data/raw
!mkdir -p data/processed
!mkdir -p data/model
!mkdir -p data/generated


# epochs = 10
# load_dataset(output_path='data/raw')
# preprocess_images('data/raw/train-images.npy', 'data/processed/train-images.npy')
# preprocess_images('data/raw/test-images.npy', 'data/processed/test-images.npy')
# save_test_sample(
#     test_images_path='data/processed/test-images.npy',
#     sample_size=16,
#     output_path='data/sample-images.npy'
# )
# train(
#     epochs=epochs, 
#     latent_dim=2, 
#     train_size=60000,
#     test_size=10000,
#     batch_size=32,
#     train_images_path='data/processed/train-images.npy',
#     test_images_path='data/processed/test-images.npy',
#     output_path='data/model')
# for i in range(1, epochs + 1):
#   generate_image(
#       test_sample_path='data/sample-images.npy',
#       model_path=f'data/model/epoch/{i}',
#       output_path=f'data/generated/{i}.png'
#   )
#   plot_latent_images(
#       model_path=f'data/model/epoch/{i}', n=20, digit_size=28, 
#       output_path=f'data/latent.{i}.png')
    
# create_animation(
#   filenames=[f'data/generated/{i}.png' for i in range(1, epochs + 1)],
#   output_file='data/cvae.gif'    
# )
# create_animation(
#   filenames=[f'data/latent.{i}.png' for i in range(1, epochs + 1)],
#   output_file='data/latent.gif'    
# )