# High Performance Machine Learning lab
StarGAN for image-to-image translation in TensorFlow 2.3 Estimator API.

## Author: Paweł Rościszewski
  based on the [StarGAN Estimator example](https://github.com/tensorflow/gan/tree/master/tensorflow_gan/examples/stargan_estimator) by Wesley Qian

In [1]:
# !pip install -r requirements.txt
# !python --version
!pip list | grep tensorflow
# print(tf.__version__)

tensorflow               2.3.2
tensorflow-datasets      4.2.0
tensorflow-estimator     2.3.0
tensorflow-gan           2.0.0
tensorflow-hub           0.12.0
tensorflow-metadata      0.30.0
tensorflow-probability   0.11.1
You should consider upgrading via the '/macierz/home/s165452/hpmllab/venv/bin/python3.7 -m pip install --upgrade pip' command.[0m


In [2]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from absl import app
from absl import flags

import tensorflow.compat.v1 as tf

import collections
import os
import sys

import cv2
import numpy as np
from six.moves import xrange  # pylint: disable=redefined-builtin


import tensorflow_gan as tfgan




In [3]:
import tensorflow_datasets as tfds

from tensorflow_gan.examples.stargan import network

from tensorflow_gan.examples.cyclegan import data_provider as cyclegan_dp

from matplotlib import pyplot as plt
%pylab inline

tf.disable_v2_behavior()
tf.compat.v1.enable_eager_execution()

logger = tf.get_logger()
logger.propagate = False

Populating the interactive namespace from numpy and matplotlib
Instructions for updating:
non-resource variables are not supported in the long term


In [4]:
batch_size = 4  # The number of images in each batch.
patch_size = 128  # The patch size of images.

generator_lr = 1e-4  # The generator learning rate.
discriminator_lr = 1e-4  # The discriminator learning rate.
max_number_of_steps = 1000000  # The maximum number of gradient steps.
steps_per_eval = 1  # The number of steps after which evaluate the model.
adam_beta1 = 0.5  # Adam Beta 1 for the Adam optimizer.
adam_beta2 = 0.999  # Adam Beta 2 for the Adam optimizer.
gen_disc_step_ratio = 0.2  # Generator:Discriminator training step ratio.

override_generator_fn=None
override_discriminator_fn=None

master = ''  # Name of the TensorFlow master to use.
ps_tasks = 0  # The number of parameter servers. If the value is 0, then the parameters are handled locally by the worker.
task = 0  # The Task ID. This value is used when training with multiple workers to identify each worker.

In [5]:
HParams = collections.namedtuple('HParams', [
    'batch_size', 'patch_size', 'generator_lr',
    'discriminator_lr', 'max_number_of_steps', 'steps_per_eval', 'adam_beta1',
    'adam_beta2', 'gen_disc_step_ratio', 'master', 'ps_tasks', 'task'
])
hparams = HParams(batch_size, patch_size, generator_lr, discriminator_lr, max_number_of_steps, 
                  steps_per_eval, adam_beta1, adam_beta2, gen_disc_step_ratio, master, ps_tasks, task)

In [6]:
def provide_dataset(split, batch_size, patch_size, num_parallel_calls=None,
                    shuffle=True, domains=None):
    """Provides batches of CelebA image patches.

    Args:
      split: Either 'train' or 'test'.
      batch_size: The number of images in each batch.
      patch_size: Python int. The patch size to extract.
      num_parallel_calls: Number of threads dedicated to parsing.
      shuffle: Whether to shuffle.
      domains: Name of domains to transform between. Must be in Celeb A dataset.

    Returns:
      A tf.data.Dataset with:
        * images:  `Tensor` of size [batch_size, 32, 32, 3] and type tf.float32.
            Output pixel values are in [-1, 1].
        * labels: A `Tensor` of size [batch_size, 10] of one-hot label
            encodings with type tf.int32, or a `Tensor` of size [batch_size],
            depending on the value of `one_hot`.

    Raises:
      ValueError: If `split` isn't `train` or `test`.
    """
    ds = tfds.load('celeb_a:2.*.*', split=split, shuffle_files=shuffle, download=False,
                   data_dir='/ssd_local/hpml/tensorflow_datasets/')

    def _filter_pred(attribute):
        def _filter(element):
            return element['attributes'][attribute]

        return _filter

    dss = tuple([ds.filter(_filter_pred(attribute)) for attribute in domains])
    ds = tf.data.Dataset.zip(dss)

    def _preprocess(*elements):
        """Map elements to the example dicts expected by the model."""
        output_dict = {}
        num_domains = len(elements)
        for idx, (domain, elem) in enumerate(zip(domains, elements)):
            uint8_img = elem['image']
            patch = cyclegan_dp.full_image_to_patch(uint8_img, patch_size)
            label = tf.one_hot(idx, num_domains)
            output_dict[domain] = {'images': patch, 'labels': label}
        return output_dict

    ds = (ds
          .map(_preprocess, num_parallel_calls=num_parallel_calls)
          .cache()
          .repeat())
    if shuffle:
        ds = ds.shuffle(buffer_size=10000, reshuffle_each_iteration=True)
    ds = (ds
          .batch(batch_size, drop_remainder=True)
          .prefetch(tf.data.experimental.AUTOTUNE))

    return ds

In [7]:
# ds = provide_dataset(
#     'train', hparams.batch_size, hparams.patch_size,
#     domains=('Narrow_Eyes', 'Oval_Face', 'Goatee'))
# ds

In [8]:
# a, b = ds
# b

In [9]:
def provide_data(split, batch_size, patch_size, num_parallel_calls=None,
                 shuffle=True, domains=None):
    """Provides batches of CelebA image patches.

    Args:
      split: Either 'train' or 'test'.
      batch_size: The number of images in each batch.
      patch_size: Python int. The patch size to extract.
      num_parallel_calls: Number of threads dedicated to parsing.
      shuffle: Whether to shuffle.
      domains: Name of domains to transform between. Must be in Celeb A dataset.

    Returns:
      A tf.data.Dataset with:
        * images:  `Tensor` of size [batch_size, patch_size, patch_size, 3] and
            type tf.float32. Output pixel values are in [-1, 1].
        * labels: A `Tensor` of size [batch_size, 10] of one-hot label
            encodings with type tf.int32, or a `Tensor` of size [batch_size],
            depending on the value of `one_hot`.

    Raises:
      ValueError: If `split` isn't `train` or `test`.
    """
    ds = provide_dataset(split, batch_size, patch_size, num_parallel_calls,
                         shuffle, domains)

    next_batch = tf.data.make_one_shot_iterator(ds).get_next()
    domains = next_batch.keys()
    images = [next_batch[domain]['images'] for domain in domains]
    labels = [next_batch[domain]['labels'] for domain in domains]

    return images, labels

In [10]:
def provide_tfdata(split, batch_size, patch_size, num_parallel_calls=None, shuffle=True, domains=None):

    ds = provide_dataset(split, batch_size, patch_size, num_parallel_calls,
                         shuffle, domains)

    next_batch = tf.data.make_one_shot_iterator(ds).get_next()
    domains = next_batch.keys()
    images = [next_batch[domain]['images'] for domain in domains]
    labels = [next_batch[domain]['labels'] for domain in domains]
    ds = tf.data.Dataset.from_tensor_slices((images, labels))
    return ds

In [11]:
def provide_dataset_batch(split, batch_size, patch_size, num_parallel_calls=None,
                    shuffle=True, domains=None) -> tf.Tensor:
    ds = tfds.load('celeb_a:2.*.*', split=split, shuffle_files=shuffle, download=False,
                   data_dir='/ssd_local/hpml/tensorflow_datasets/')

    def _filter_pred(attribute):
        def _filter(element):
            return element['attributes'][attribute]

        return _filter

    dss = tuple([ds.filter(_filter_pred(attribute)) for attribute in domains])
    ds = tf.data.Dataset.zip(dss)

    def _preprocess(*elements):
        """Map elements to the example dicts expected by the model."""
        output_dict = {}
        num_domains = len(elements)
        for idx, (domain, elem) in enumerate(zip(domains, elements)):
            uint8_img = elem['image']
            patch = cyclegan_dp.full_image_to_patch(uint8_img, patch_size)
            label = tf.one_hot(idx, num_domains)
            output_dict[domain] = {'images': patch, 'labels': label}
        return output_dict

    ds = (ds
          .map(_preprocess, num_parallel_calls=num_parallel_calls)
          .cache()
          .repeat())
    if shuffle:
        ds = ds.shuffle(buffer_size=10000, reshuffle_each_iteration=True)
    ds = (ds
          .batch(batch_size, drop_remainder=True)
          .prefetch(tf.data.experimental.AUTOTUNE))

#     next_batch: tf.Tensor = tf.data.make_one_shot_iterator(ds)
    return ds


In [12]:
# elements =  ([{"a": 1, "b": "foo"},
#               {"a": 2, "b": "bar"},
#               {"a": 3, "b": "baz"}])
# dataset = tf.data.Dataset.from_generator(lambda: elements, {"a": tf.int32, "b": tf.string})
# # `map_func` takes a single argument of type `dict` with the same keys
# # as the elements.
# # result = dataset.map(lambda d: str(d["a"]) + d["b"])
# dataset.
# dataset = tf.data.Dataset.from_tensor_slices([1, 2, 3])
# for element in dataset:
#     print(element)

In [13]:
# # images, labels = provide_data('train', hparams.batch_size, hparams.patch_size, domains=('Narrow_Eyes', 'Oval_Face', 'Goatee'))
# ds = provide_tfdata('train', hparams.batch_size, hparams.patch_size, domains=('Narrow_Eyes', 'Oval_Face', 'Goatee'))

In [14]:
# tf.executing_eagerly()
# images, labels = provide_data('train', hparams.batch_size, hparams.patch_size, domains=('Narrow_Eyes', 'Oval_Face', 'Goatee'))

In [15]:
# ds = provide_tfdata('train', hparams.batch_size, hparams.patch_size, domains=('Narrow_Eyes', 'Oval_Face', 'Goatee'))

In [16]:
def _get_optimizer(gen_lr, dis_lr, beta1, beta2):
    """Returns generator optimizer and discriminator optimizer.

    Args:
      gen_lr: A scalar float `Tensor` or a Python number.  The Generator learning
        rate.
      dis_lr: A scalar float `Tensor` or a Python number.  The Discriminator
        learning rate.
      beta1: A scalar float `Tensor` or a Python number. The beta1 parameter to
        the `AdamOptimizer`.
      beta2: A scalar float `Tensor` or a Python number. The beta2 parameter to
        the `AdamOptimizer`.

    Returns:
      A tuple of generator optimizer and discriminator optimizer.
    """
    gen_opt = tf.train.AdamOptimizer(
        gen_lr, beta1=beta1, beta2=beta2, use_locking=True)
    dis_opt = tf.train.AdamOptimizer(
        dis_lr, beta1=beta1, beta2=beta2, use_locking=True)
    return gen_opt, dis_opt

In [17]:
def _define_train_step(gen_disc_step_ratio):
    """Get the training step for generator and discriminator for each GAN step.

    Args:
      gen_disc_step_ratio: A python number. The ratio of generator to
        discriminator training steps.

    Returns:
      GANTrainSteps namedtuple representing the training step configuration.
    """

    if gen_disc_step_ratio <= 1:
        discriminator_step = int(1 / gen_disc_step_ratio)
        return tfgan.GANTrainSteps(1, discriminator_step)
    else:
        generator_step = int(gen_disc_step_ratio)
        return tfgan.GANTrainSteps(generator_step, 1)

In [18]:
def _get_summary_image(estimator, test_images_np):
    """Returns a numpy image of the generate on the test images."""
    num_domains = len(test_images_np)

    img_rows = []
    for img_np in test_images_np:
        def test_input_fn():
            dataset_imgs = [img_np] * num_domains  # pylint:disable=cell-var-from-loop
            dataset_lbls = [tf.one_hot([d], num_domains) for d in xrange(num_domains)]

            # Make into a dataset.
            dataset_imgs = np.stack(dataset_imgs)
            dataset_imgs = np.expand_dims(dataset_imgs, 1)
            dataset_lbls = tf.stack(dataset_lbls)
            unused_tensor = tf.zeros(num_domains)
            return tf.data.Dataset.from_tensor_slices(((dataset_imgs, dataset_lbls),
                                                       unused_tensor))

        prediction_iterable = estimator.predict(test_input_fn)
        predictions = [next(prediction_iterable) for _ in xrange(num_domains)]
        transform_row = np.concatenate([img_np] + predictions, 1)
        img_rows.append(transform_row)

    all_rows = np.concatenate(img_rows, 0)
    # Normalize` [-1, 1] to [0, 1].
    normalized_summary = (all_rows + 1.0) / 2.0
    return normalized_summary

In [19]:
# !wget -O test_image1.jpg https://static01.nyt.com/newsgraphics/2020/11/12/fake-people/4b806cf591a8a76adfc88d19e90c8c634345bf3d/fallbacks/mobile-03.jpg 
# !wget -O test_image2.jpg https://static01.nyt.com/newsgraphics/2020/11/12/fake-people/4b806cf591a8a76adfc88d19e90c8c634345bf3d/fallbacks/mobile-04.jpg
# !wget -O test_image3.jpg https://static01.nyt.com/newsgraphics/2020/11/12/fake-people/4b806cf591a8a76adfc88d19e90c8c634345bf3d/fallbacks/mobile-05.jpg
    
# !convert test_image1.jpg -resize 128x128 test_image1.png
# !convert test_image2.jpg -resize 128x128 test_image2.png
# !convert test_image3.jpg -resize 128x128 test_image3.png

# !file test_image1.png
# !file test_image2.png
# !file test_image3.png

In [20]:
from tensorflow.python.client import device_lib

def get_device(dev_type):
    for dev in device_lib.list_local_devices():
        if dev.device_type == dev_type:
            return dev
        
    print("No matching device found :(")
    return None

In [21]:
print(device_lib.list_local_devices())

[name: "/device:CPU:0"
device_type: "CPU"
memory_limit: 268435456
locality {
}
incarnation: 14539651827456077790
, name: "/device:XLA_CPU:0"
device_type: "XLA_CPU"
memory_limit: 17179869184
locality {
}
incarnation: 5125796746441341751
physical_device_desc: "device: XLA_CPU device"
, name: "/device:XLA_GPU:0"
device_type: "XLA_GPU"
memory_limit: 17179869184
locality {
}
incarnation: 3168644697156639013
physical_device_desc: "device: XLA_GPU device"
, name: "/device:GPU:0"
device_type: "GPU"
memory_limit: 5698663232
locality {
  bus_id: 1
  links {
  }
}
incarnation: 1690321202961177756
physical_device_desc: "device: 0, name: GeForce GTX 1060 6GB, pci bus id: 0000:01:00.0, compute capability: 6.1"
]


In [22]:
# Make sure steps integers are consistent.
if hparams.max_number_of_steps % hparams.steps_per_eval != 0:
    raise ValueError('`max_number_of_steps` must be divisible by '
                     '`steps_per_eval`.')

# Create optimizers.
gen_opt, dis_opt = _get_optimizer(hparams.generator_lr,
                                  hparams.discriminator_lr,
                                  hparams.adam_beta1, hparams.adam_beta2)

config_proto = tf.ConfigProto()
config_proto.gpu_options.allow_growth = True
config_proto.allow_soft_placement=True

from tensorflow.python.client import device_lib

# dev_fn = lambda op: get_device("CPU").name if get_device("CPU") else None

# dev_fn = lambda op: '/cpu:0'
# run_config = tf.estimator.RunConfig(session_config=config_proto )#, device_fn=dev_fn)
import json

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "chief": ["172.20.83.213:23456"],
        "worker": [ "172.20.83.210:23456"],
    },
   "task": {"type": "chief", "index": 0}
})

import tensorflow
strategy =  tf.distribute.experimental.MultiWorkerMirroredStrategy()
run_config = tf.estimator.RunConfig(session_config=config_proto, train_distribute=strategy)
# run_config = tf.estimator.RunConfig(session_config=config_proto)


# Create estimator.
stargan_estimator = tfgan.estimator.StarGANEstimator(
#     model_dir='models',
    model_dir='/tmp/models',
    generator_fn=override_generator_fn or network.generator,
    discriminator_fn=override_discriminator_fn or network.discriminator,
    loss_fn=tfgan.stargan_loss,
    generator_optimizer=gen_opt,
    discriminator_optimizer=dis_opt,
    get_hooks_fn=tfgan.get_sequential_train_hooks(
        _define_train_step(hparams.gen_disc_step_ratio)),
    add_summaries=tfgan.estimator.SummaryType.IMAGES,
    config=run_config)

# Get input function for training and test images.
train_input_fn = lambda: provide_dataset_batch(
    'train', hparams.batch_size, hparams.patch_size,
    domains=('Narrow_Eyes', 'Oval_Face', 'Goatee'))

test_images_np = np.array([cv2.imread('test_image1.png') / 255.0 * 2 - 1,
                          cv2.imread('test_image2.png') / 255.0 * 2 - 1,
                          cv2.imread('test_image3.png') / 255.0 * 2 - 1],
                          dtype=np.float32)
test_images_np = test_images_np[:,:,:,::-1]

INFO:tensorflow:Enabled multi-worker collective ops with available devices: ['/job:chief/replica:0/task:0/device:CPU:0', '/job:chief/replica:0/task:0/device:XLA_CPU:0', '/job:chief/replica:0/task:0/device:XLA_GPU:0', '/job:chief/replica:0/task:0/device:GPU:0']
INFO:tensorflow:Using MirroredStrategy with devices ('/job:chief/task:0/device:GPU:0',)
INFO:tensorflow:MultiWorkerMirroredStrategy with cluster_spec = {'chief': ['172.20.83.213:23456'], 'worker': ['172.20.83.210:23456']}, task_type = 'chief', task_id = 0, num_workers = 2, local_devices = ('/job:chief/task:0/device:GPU:0',), communication = CollectiveCommunication.AUTO
INFO:tensorflow:TF_CONFIG environment variable: {'cluster': {'chief': ['172.20.83.213:23456'], 'worker': ['172.20.83.210:23456']}, 'task': {'type': 'chief', 'index': 0}}
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:RunConfig initialized for Distribute Coordinator with INDEPENDENT_WORKER mode
INFO:tensorflow:Using config: {'_m

In [23]:
def formatter_log(tensors):
    summary_img = _get_summary_image(stargan_estimator, test_images_np)
    plt.imshow(summary_img)
    plt.show()
    return f'Current Iteration: {tensors["total_iterations"]}'


import time
class PerformanceMeasurementHook(tf.train.SessionRunHook):
    def __init__(self, batch_size, every_steps=5):
        self.every_steps = every_steps
        self.counter = 0
        self.batch_size = batch_size
        
    def begin(self):
        self.start = time.time()
        
    def after_run(self, run_context, run_values):
        self.counter+=1
        if (self.counter<self.every_steps):
            return
        
        end = time.time()
        
        self.counter=0
        
        diff = end - self.start
        
        avg_step = diff/self.every_steps
        print(f"PERFORMANCE METRICS: Average step execution time: {avg_step:.4f}s")
        
        image_rate =  (self.every_steps * self.batch_size) / diff
        print(f"PERFORMANCE METRICS: Average image rate: {image_rate:.4f} img/s")
        self.start = time.time()

In [None]:
logging_hook = tf.train.LoggingTensorHook({"total_iterations": "global_step"}, every_n_iter=hparams.steps_per_eval, formatter=formatter_log)       
performace_hook = PerformanceMeasurementHook(batch_size=hparams.batch_size, every_steps=2)

# profiler_hook = tf.estimator.ProfilerHook(save_steps=True, output_dir='./', show_dataflow=True, show_memory=True)
# res = stargan_estimator.train(train_input_fn,hooks=[logging_hook, performace_hook, profiler_hook], max_steps=hparams.max_number_of_steps)


tf.estimator.train_and_evaluate(stargan_estimator,
                                train_spec=tf.estimator.TrainSpec(input_fn=train_input_fn),
                                eval_spec=tf.estimator.EvalSpec(input_fn=train_input_fn))


# res = stargan_estimator.train(train_input_fn, hooks=[logging_hook, performace_hook], max_steps=hparams.max_number_of_steps)

INFO:tensorflow:Running `train_and_evaluate` with Distribute Coordinator.
INFO:tensorflow:Running Distribute Coordinator with mode = 'independent_worker', cluster_spec = {'chief': ['172.20.83.213:23456'], 'worker': ['172.20.83.210:23456']}, task_type = 'chief', task_id = 0, environment = None, rpc_layer = 'grpc'
INFO:tensorflow:Using MirroredStrategy with devices ('/job:chief/task:0/device:GPU:0',)
INFO:tensorflow:MultiWorkerMirroredStrategy with cluster_spec = {'chief': ['172.20.83.213:23456'], 'worker': ['172.20.83.210:23456']}, task_type = 'chief', task_id = 0, num_workers = 2, local_devices = ('/job:chief/task:0/device:GPU:0',), communication = CollectiveCommunication.AUTO
INFO:tensorflow:Using MirroredStrategy with devices ('/job:chief/task:0/device:GPU:0',)
INFO:tensorflow:MultiWorkerMirroredStrategy with cluster_spec = {'chief': ['172.20.83.213:23456'], 'worker': ['172.20.83.210:23456']}, task_type = 'chief', task_id = 0, num_workers = 2, local_devices = ('/job:chief/task:0/devi

INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/models/model.ckpt-0
Instructions for updating:
Use standard file utilities to get mtimes.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/models/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...


In [None]:
# dataset = tf.data.Dataset.from_tensor_slices({'a': ([1, 2], [3, 4]),'b': [5, 6]})

In [None]:
type(train_input_fn())

In [None]:
x = tf.reshape(tf.range(12), (3,4))

p, q, r = tf.unstack(x)
p.shape.as_list()