<a href="https://colab.research.google.com/github/stephenzwj/Kaggle-Competition-Favorita/blob/master/tfOptimizations.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import contextlib
import functools
import gc
import multiprocessing
import os
import shutil
import tarfile
import time
import timeit
import urllib.request

import cv2
import numpy as np
import pandas as pd
import tensorflow as tf

print(tf.__version__)
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))
with tf.device("GPU:0"):
  tf.ones(())  # Make sure we can run on GPU

data_root = "/tmp/demo_images"
profile_dir = os.path.join(data_root, "profiles")
os.makedirs(profile_dir, exist_ok=True)

# This ensures that XLA and ptxas work well together, and helps with scaling.
print("XLA_FLAGS='{}'".format(os.getenv("XLA_FLAGS")))

2.1.0-dev20191028
Num GPUs Available:  8
XLA_FLAGS='--xla_gpu_cuda_data_dir=/usr/local/cuda'


## Configure task

In [None]:
RESOLUTION = (224, 224)
NUM_CHANNELS = 3

num_images_per_label = 50000
NUM_TOTAL_IMAGES = num_images_per_label * 2

pos_dir = os.path.join(data_root, "positives")
neg_dir = os.path.join(data_root, "negatives")
download_failures = os.path.join(data_root, "failed_to_download")
tf_record_dir = os.path.join(data_root, "tfrecord_data")
tf_record_array_dir = os.path.join(data_root, "tfrecord_array_data")

os.makedirs(pos_dir, exist_ok=True)
os.makedirs(neg_dir, exist_ok=True)
os.makedirs(download_failures, exist_ok=True)

## Download the data.

### Grab OpenImages metadata.

In [None]:
def prepackaged_data():
  """Grab 100,000 prepackaged images.

  For convenience, a set of 50,000 positives and 50,000 negatives has been 
  prepackaged to reduce the time to get up and running.
  """
  if (len(os.listdir(pos_dir)) >= 50000 and
      len(os.listdir(neg_dir)) >= 50000):
    return  # Already done.

  PREPACKAGED_DATA_TAR = os.path.join(data_root, "tf_world_classifier_data.tar.gz")
  if not os.path.exists(PREPACKAGED_DATA_TAR):
    print("Downloading tarball...")
    urllib.request.urlretrieve(
        "http://download.tensorflow.org/models/official/tf_world_classifier_data.tar.gz",
        PREPACKAGED_DATA_TAR)

    try:
      cwd = os.getcwd()
      os.chdir(data_root)
      print("Extracting files...")
      with tarfile.open(PREPACKAGED_DATA_TAR) as f:
        f.extractall()
    finally:
      os.chdir(cwd)

  for label in ["positives", "negatives"]:
    for i in os.listdir(os.path.join(data_root, "tf_world_classifier_data", label)):
      destination = os.path.join(data_root, label, i)
      if not os.path.exists(destination):
        shutil.copy(os.path.join(data_root, "tf_world_classifier_data", label, i),
                    destination)

prepackaged_data()

### Register positive labels

In [None]:
cat_types = """/m/01yrx,Cat
/m/012c9l,Domestic short-haired cat
/m/012k6q,Ocicat
/m/0172jz,Bobcat
/m/017fsk,Norwegian forest cat
/m/01nq_x,Wild cat
/m/01pdqb,Rusty-spotted cat
/m/01qpsq,Polecat
/m/02j4fs,Madagascar cat
/m/02rcwpb,Malayan cat
/m/03bw_6d,Tom cat
/m/03c_kl,Snowshoe cat
/m/03c_ndy,Aegean cat
/m/03dj64,Black cat
/m/0409r1,Saber-toothed cat
/m/06jdsz,Rex cat
/m/07k6w8,Small to medium-sized cats
/m/08x9c0,Polydactyl cat
/m/0cdnk,Big cats
/m/0g4cd0,Tabby cat
/m/0gvvmf6,Napoleon cat
/m/0k8hs,Domestic long-haired cat"""
cat_types_set = {i.split(",")[0] for i in cat_types.splitlines()}

### Download raw image files

In [None]:
def maybe_log(i, cadence, max_len):
  i += 1
  if i % cadence and i != max_len:
    return
  print("{:>8} / {}, ({:.1f}%)".format(i, max_len, i / max_len * 100))

def download_data():
  if (len(os.listdir(pos_dir)) >= num_images_per_label and
      len(os.listdir(neg_dir)) >= num_images_per_label):
    return  # Already done.

  if os.path.exists(tf_record_dir):
    # TFRecords will have to be regenerated.
    shutil.rmtree(tf_record_dir)

  TRAIN_IMAGES_METADATA_PATH = tf.keras.utils.get_file(
    os.path.join(data_root, "train-images-with-labels-with-rotation.csv"),
    "https://storage.googleapis.com/openimages/v5/train-images-with-labels-with-rotation.csv")
  TRAIN_ANNOTATIONS_PATH = tf.keras.utils.get_file(
      os.path.join(data_root, "train-annotations-human-imagelabels.csv"),
      "https://storage.googleapis.com/openimages/v5/train-annotations-human-imagelabels.csv")

  print("Loading metadata CSVs...")
  train_images = pd.read_csv(TRAIN_IMAGES_METADATA_PATH)
  train_annotations = pd.read_csv(TRAIN_ANNOTATIONS_PATH)

  print("Parsing labels...")
  id_is_cat = set()
  for i, row in enumerate(train_annotations.itertuples()):
    maybe_log(i, int(5e6), len(train_annotations))
    if row.Confidence and row.LabelName in cat_types_set:
      id_is_cat.add(row.ImageID)

  print("{} images with cats detected.".format(len(id_is_cat)))

  print("Extracting URLs...")
  thumbnail_urls = {True: [], False: []}
  for i, row in enumerate(train_images.itertuples()):
    maybe_log(i, int(1e6), len(train_images))
    if not isinstance(row.Thumbnail300KURL, str):
      continue
  
    thumbnail_urls[row.ImageID in id_is_cat].append(row.Thumbnail300KURL)
    
  del train_images
  del train_annotations

  print("Selecting data...")
  np.random.seed(0)
  shuffled_positive_urls = np.random.permutation(thumbnail_urls[True])
  shuffled_negative_urls = np.random.permutation(thumbnail_urls[False])
  for urls, dest in [(shuffled_positive_urls, pos_dir), (shuffled_negative_urls, neg_dir)]:
    failures = 0
    successes = 0
    for i, url in enumerate(urls):
      if successes >= num_images_per_label:
        print("Successfully downloaded {} examples".format(num_images_per_label))
        break

      filename = os.path.split(url)[1]
      if filename.endswith("?zz=1"):
        filename = filename[:-5]

      if os.path.exists(os.path.join(download_failures, filename)):
        continue  # An earlier run failed to download the file.

      fpath = os.path.join(dest, filename)
      if os.path.exists(fpath):
        successes += 1
        continue
      
      try:
        urllib.request.urlretrieve(url, fpath)
        successes += 1
      except urllib.request.HTTPError:
        open(os.path.join(download_failures, filename), "w").close()
        failures += 1
        if failures > 10 and failures / (i + 1) > 0.1:
          raise # Too many failures.
        continue  # Thumbnails are not guaranteed to exist.

      if not (i + 1) % 100:
        print("\r{}".format(i + 1), end="")
    print()

download_data()

### Convert to TFRecords (Optional. JPEGs can be used directly.)

In [None]:
def get_paths_and_labels():
  return (
      [(os.path.join(pos_dir, i), 1) for i in os.listdir(pos_dir)] + 
      [(os.path.join(neg_dir, i), 0) for i in os.listdir(neg_dir)])

def write_to_tfrecords(decoded_resolution=None):
  if decoded_resolution:
    record_dir = os.path.join(tf_record_array_dir, str(decoded_resolution))
  else:
    record_dir = tf_record_dir

  if os.path.exists(record_dir):
    return
  os.makedirs(record_dir, exist_ok=True)

  print("Converting images to TFRecords...")
  records_per_shard = 50

  shard_number = 0
  path_template = os.path.join(record_dir, "shard_{0:04d}.tfrecords")
  writer = tf.io.TFRecordWriter(path_template.format(shard_number))
  for i, (image_path, label) in enumerate(get_paths_and_labels()):
    if i and not (i % records_per_shard):
      shard_number += 1
      writer.close()
      writer = tf.io.TFRecordWriter(path_template.format(shard_number))

    with open(image_path, "rb") as f:
      image_bytes = f.read()

    if decoded_resolution:
      # TODO(robieta): make this faster with imap
      image = tf.io.decode_jpeg(image_bytes)
      image = tf.cast(image, tf.float32)
      image = tf.image.resize(image, (decoded_resolution,) * 2)
      if image.shape[2] == 1:
        image = tf.tile(image, (1, 1, 3))
      image_bytes = tf.io.encode_jpeg(
        tf.cast(image, tf.uint8)
      ).numpy()


    record_bytes = tf.train.Example(features=tf.train.Features(feature={
        "image": tf.train.Feature(bytes_list=tf.train.BytesList(value=[image_bytes])),
        "label": tf.train.Feature(int64_list=tf.train.Int64List(value=[label]))
    })).SerializeToString()

    writer.write(record_bytes)

  writer.close()
  print("TFRecord conversion complete.")


RECORD_PATTERN = os.path.join(tf_record_dir, "*.tfrecords")
RESIZED_RECORD_PATTERN = os.path.join(tf_record_array_dir, "{}", "*.tfrecords")
RECORD_SCHEMA = {
    "image": tf.io.FixedLenFeature([], dtype=tf.string),
    "label": tf.io.FixedLenFeature([1], dtype=tf.int64)
}

write_to_tfrecords()

assert RESOLUTION[0] == RESOLUTION[1], "Resize is hard coded to square images."
write_to_tfrecords(RESOLUTION[0])

## Model and data setup.

In [None]:
def get_input_shape():
  if tf.keras.backend.image_data_format() == "channels_last":
    return RESOLUTION + (NUM_CHANNELS,)
  elif tf.keras.backend.image_data_format() == "channels_first":
    return (NUM_CHANNELS,) + RESOLUTION
  raise ValueError("Unknown format.")


# Native jpg layout.
NHWC_INPUT_SHAPE = RESOLUTION + (NUM_CHANNELS,)


def transform_image(image):
  image = image / 255.0  # Scale occurs in random transformation
  
  image = tf.image.random_flip_left_right(image)
  image = tf.image.random_flip_up_down(image)
  image += tf.random.normal(tf.shape(image), mean=-0.5, stddev=0.1, dtype=image.dtype)
  return image


def make_model(input_dtype=tf.float32, transform_on_device=False):
  input_shape = get_input_shape()
  input_layer = tf.keras.layers.Input(shape=input_shape, dtype=input_dtype)
  backbone = tf.keras.applications.mobilenet_v2.MobileNetV2(
      include_top=False,
      weights="imagenet",
      input_shape=input_shape,
      pooling="avg",
  )
  layer = input_layer
  if transform_on_device:
    layer = tf.keras.layers.Lambda(transform_image)(layer)

  layer = backbone(layer)
  final = tf.keras.layers.Dense(1, activation=None)(layer)

  return tf.keras.models.Model(input_layer, final, name="cat_classifier")

## Training loop

But wait, this is much more complicated than the slides...

Indeed. This is because it runs all of the options discussed, and tries to clean up after itsef (Hence the context managers), profiles, and provide accurate steady state measurements.

### Toggles for various optimizations:

In [None]:
@contextlib.contextmanager
def stop_profiler():
  """Used to guarantee that the TensorFlow profiler does not remain on.
  
  We don't want to mix traces from different runs as it would make them hard
  to interpret, so this ensures that the profiler is disabled even if our
  training loop crashes.
  """
  try:
    yield
  finally:
    tf.summary.trace_off()

@contextlib.contextmanager
def use_mixed_precision(loss_scale):
  """Enable mixed precision, and reset the policy after training."""
  old_policy = tf.keras.mixed_precision.experimental.global_policy()

  try:
    policy = tf.compat.v2.keras.mixed_precision.experimental.Policy(
        "mixed_float16", loss_scale=loss_scale)
    tf.keras.mixed_precision.experimental.set_policy(policy)
    yield
  finally:
    tf.keras.mixed_precision.experimental.set_policy(old_policy)


@contextlib.contextmanager
def enable_xla():
  """Enable XLA, and disable it after training is complete."""
  try:
    tf.config.optimizer.set_jit(True)
    yield
  finally:
    tf.config.optimizer.set_jit(False)


_THREADS_PER_GPU = 2
@contextlib.contextmanager
def tuning_context():
  """Hand tuned model configurations.
  
  Historically these knobs have improved performance, but as of 10/28/2019 they
  actually hurt performance. However they are provided simply for completeness
  to show some of the lower level knobs.
  """
  try:
    os.environ['TF_GPU_THREAD_MODE'] = "gpu_private"
    os.environ['TF_GPU_THREAD_COUNT'] = str(_THREADS_PER_GPU)
    tf.keras.backend.set_image_data_format("channels_first")
    yield
  finally:
    os.environ.pop('TF_GPU_THREAD_MODE', None)
    os.environ.pop('TF_GPU_THREAD_COUNT', None)
    tf.keras.backend.set_image_data_format("channels_last")

In [None]:
@contextlib.contextmanager
def null_context():
  """Implementation detail. Used if a given toggle is disabled."""
  yield

def null_decorator(f):
  """Implementation detail. Used if tf.function is disabled."""
  return f

def train_model(data_fn, global_batch_size, use_tf_function=False, 
                strategy=None, xla=False, mixed_precision=False, 
                loss_scale="dynamic", collect_profile=False, tuned=False, 
                transform_on_device=False):

  # Ensure runs are independent.
  tf.keras.backend.clear_session()
  gc.collect()
  time.sleep(3)

  if tuned:
    assert strategy, "Tuned version assumes a distribuation strategy is present."

  dtype = tf.float16 if mixed_precision else tf.float32
  step_decorator = tf.function if use_tf_function and not strategy else null_decorator
  strategy_scope = strategy.scope() if strategy else null_context()
  xla_scope = enable_xla() if xla else null_context()
  precision_scope = use_mixed_precision(loss_scale) if mixed_precision else null_context()
  tuning_scope = tuning_context() if tuned else null_context()

  with strategy_scope, xla_scope, precision_scope, stop_profiler(), tuning_scope:
    model = make_model(input_dtype=dtype, transform_on_device=transform_on_device)
    optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
    if mixed_precision:
      optimizer = tf.keras.mixed_precision.experimental.LossScaleOptimizer(optimizer, loss_scale=loss_scale)

    @step_decorator
    def replica_step(features, labels):
      with tf.GradientTape() as tape:
        logits = model(features, training=True)
        replica_loss = tf.nn.sigmoid_cross_entropy_with_logits(labels, logits)
        loss = tf.nn.compute_average_loss(replica_loss, global_batch_size=global_batch_size)
      grads = tape.gradient(loss, model.trainable_variables)
      optimizer.apply_gradients(zip(grads, model.trainable_variables))
      return loss

    step_fn = replica_step
    if strategy and use_tf_function:
      @tf.function
      def replicated_step(features, labels):
        loss = strategy.experimental_run_v2(replica_step, (features, labels))
        return loss

      step_fn = replicated_step

    if strategy:
      dataset = data_fn(batch_size=global_batch_size, dtype=dtype, 
                        transform_on_device=transform_on_device)
      if tuned:
        options = tf.data.Options()
        private_threads = (multiprocessing.cpu_count() - 
                           strategy.num_replicas_in_sync * (_THREADS_PER_GPU + 1))
        options.experimental_threading.private_threadpool_size = private_threads
        dataset = dataset.with_options(options)
      data = strategy.experimental_distribute_dataset(dataset)
    else:
      assert not transform_on_device
      data = data_fn(batch_size=global_batch_size, dtype=dtype)

    schedule = [
        5,                             # Burn in
        5 if collect_profile else 0,   # Profiling
        30,                            # Steady state throughput
    ]
    times = []
    
    for step_number, inputs in enumerate(data):
      loss = step_fn(*inputs)

      # Burn in
      if schedule[0]:
        schedule[0] -= 1
        if not schedule[0]:
          print("Burn in complete.")
          if schedule[1]:
            time.sleep(2)  # Let running ops finish to start from a clean trace.
            tf.summary.trace_on(profiler=True)
          else:
            # Skip straight to steady state throughput
            start_time = timeit.default_timer()
            iter_count = 0
        continue

      # Op profiler
      if schedule[1]:
        schedule[1] -= 1
        if not schedule[1]:
          tf.summary.trace_export(name="my_trace", profiler_outdir=profile_dir)
          start_time = timeit.default_timer()
          iter_count = 0
        continue

      # Profile steady state execution
      schedule[2] -= 1
      iter_count += 1
      times.append(timeit.default_timer())
      if not schedule[2]:
        break

    run_time = timeit.default_timer() - start_time
    step_time = run_time / iter_count
    # print(np.array(times[1:]) - np.array(times[:-1]))
    print("{} steps".format(iter_count))
    print("Mean step time: {:>6.2f} sec".format(step_time))
    print("Images / sec:   {:>6d}".format(int(global_batch_size / step_time)))


## First pass at a training function.

### Define a generator based data pipeline

In [None]:
def random_flip(image):
  hflip = np.random.random() > 0.5
  vflip = np.random.random() > 0.5
  if hflip and vflip:
    image = cv2.flip(image, -1)
  elif hflip:
    image = cv2.flip(image, -1)
  elif vflip:
    image = cv2.flip(image, 1)
  return image

def normalize_and_add_noise(image):
  image = image.astype(np.float32) / 255 - 0.5
  image += np.random.normal(loc=0, scale=0.1, size=image.shape)
  return image

def make_batch(features, labels):
  x = tf.convert_to_tensor(np.stack(features, axis=0))
  y = tf.convert_to_tensor(np.array(labels, dtype=np.float32)[:, np.newaxis])
  features.clear()
  labels.clear()
  return x, y

def data_generator(batch_size, **kwargs):
  epoch_order = np.random.permutation(get_paths_and_labels())
  features, labels = [], []
  for image_path, label in epoch_order:
    image = cv2.imread(image_path)

    # Resize to training resolution
    image = cv2.resize(image, RESOLUTION)

    # Randomly horizontal and vertical flip
    image = random_flip(image)

    # Normalize, center, and add Gaussian noise
    image = normalize_and_add_noise(image)
    
    features.append(image)
    labels.append(label)
    if len(features) == batch_size:
      yield make_batch(features, labels)

In [None]:
for batch_size in [32, 64, 128]:
  print("Batch size: {}".format(batch_size))
  train_model(data_generator, batch_size)
  print()

Batch size: 32
Burn in complete.
30 steps
Mean step time:   0.48 sec
Images / sec:       66

Batch size: 64
Burn in complete.
30 steps
Mean step time:   0.85 sec
Images / sec:       75

Batch size: 128
Burn in complete.
30 steps
Mean step time:   1.60 sec
Images / sec:       80



## Add tf.data

### Use the images directly

In [None]:
def make_jpg_dataset(batch_size, dtype=tf.float32, transform_on_device=False, 
                     already_resized=False):
  if already_resized:
    raise NotImplementedError(
        "`already_resized` is only implemented for the TFRecords path.")

  def get_bytes_and_label(filepath):
    image_bytes = tf.io.read_file(filepath)
    label = tf.strings.regex_full_match(filepath, pos_dir + ".+")
    return image_bytes, tf.expand_dims(label, 0)

  def process_image(image_bytes, label):
    image = tf.io.decode_jpeg(image_bytes)
    image = tf.cast(image, dtype)
    image = tf.image.resize(image, RESOLUTION)

    if tf.shape(image)[2] == 1:
      # Some images are greyscale.
      image = tf.tile(image, (1, 1, 3))

    image.set_shape(NHWC_INPUT_SHAPE)

    if not transform_on_device:
      image = transform_image(image)

    if tf.keras.backend.image_data_format() == "channels_first":
      image = tf.transpose(image, (2, 0, 1))
    
    return image, tf.cast(label, dtype)

  dataset = tf.data.Dataset.list_files([pos_dir + "/*", neg_dir + "/*"])
  dataset = dataset.shuffle(NUM_TOTAL_IMAGES)
  dataset = dataset.map(get_bytes_and_label, num_parallel_calls=tf.data.experimental.AUTOTUNE)
  dataset = dataset.map(process_image, num_parallel_calls=tf.data.experimental.AUTOTUNE)
  dataset = dataset.batch(batch_size, drop_remainder=True)
  return dataset.prefetch(tf.data.experimental.AUTOTUNE)

### Use TFRecords

In [None]:
def make_dataset(batch_size, dtype=tf.float32, transform_on_device=False, 
                 already_resized=False):
  def parse_fn(record):
    record = tf.io.parse_single_example(record, RECORD_SCHEMA)
    image = tf.io.decode_jpeg(record["image"])
    image = tf.cast(image, dtype)
    if not already_resized:
      image = tf.image.resize(image, RESOLUTION)

    if tf.shape(image)[2] == 1:
      # Some images are greyscale.
      image = tf.tile(image, (1, 1, 3))

    image.set_shape(NHWC_INPUT_SHAPE)

    if not transform_on_device:
      image = transform_image(image)

    if tf.keras.backend.image_data_format() == "channels_first":
      image = tf.transpose(image, (2, 0, 1))
    
    return image, tf.cast(record["label"], dtype)

  pattern = (
      RESIZED_RECORD_PATTERN.format(RESOLUTION[0]) if already_resized
      else RECORD_PATTERN)

  dataset = tf.data.Dataset.list_files(pattern)
  dataset = dataset.interleave(tf.data.TFRecordDataset, num_parallel_calls=tf.data.experimental.AUTOTUNE)
  dataset = dataset.shuffle(4 * batch_size)
  dataset = dataset.map(parse_fn, num_parallel_calls=tf.data.experimental.AUTOTUNE)
  dataset = dataset.batch(batch_size, drop_remainder=True)
  return dataset.prefetch(tf.data.experimental.AUTOTUNE)

### *already_resized* and *transform_on_device*

Even with maximum parallelization, the CPU can only produce a bit over 3000 examples per second. This is fine for 1 GPU training since the GPU maxes out in the low 2000's, but would prevent reasonable scaling to more GPUs. This is due to two principle bottlenecks:

#### Native image size

The downloaded thumbnails tend to be around 400x600 resolution, whereas we're training at 224x224. This means that we have to move approximately 6x as many bytes into memory, spend a correspondingly long time decoding the jpg's, and incur an extra memcpy for the resize. It turns out to be quite important to resize the images to 224x224 and use those resized images in the input pipeline.

#### Random augmentation

The CPU simply cannot add noise to the images quickly enough to keep up with the GPU, so to maintain performance for the multi-GPU case we have to move those transformations from the input pipeline to the start of the model. Even though that puts them on the critical path, the GPU can process the augmentation so quickly that it isn't an issue.

In [None]:
def measure_dataset_throughput(dataset_fn, label=""):
  count = 0
  batch_size = 2048
  for i, _ in enumerate(dataset_fn(batch_size=batch_size).take(50)):
    if i == 3:
      st = timeit.default_timer()
    if i > 3:
      count += 1
  step_time = (timeit.default_timer() - st) / count
  print("{:<45}  {:>6.0f} Images / sec".format(label, batch_size / step_time // 100 * 100))


def make_synthetic_dataset(batch_size, dtype=tf.float32, **kwargs):
  dataset = tf.data.Dataset.range(2 * num_images_per_label)
  def map_fn(_):
    x = tf.zeros(shape=get_input_shape(), dtype=dtype)
    y = tf.zeros(shape=(1,), dtype=dtype)
    return x, y
  dataset = dataset.map(map_fn, num_parallel_calls=tf.data.experimental.AUTOTUNE)
  dataset = dataset.batch(batch_size, drop_remainder=True)
  return dataset.prefetch(tf.data.experimental.AUTOTUNE)


measure_dataset_throughput(
    functools.partial(make_jpg_dataset, dtype=tf.float16),
    "Use JPEGs directly")

measure_dataset_throughput(
    functools.partial(make_dataset, dtype=tf.float16),
    "Use TFRecords")

measure_dataset_throughput(
    functools.partial(make_dataset, dtype=tf.float16, transform_on_device=True, 
                      already_resized=True),
    "Use TFRecords with scaling optimizations")

# Use synthetic data to ensure model is not input bound.
measure_dataset_throughput(
    functools.partial(make_synthetic_dataset, dtype=tf.float16),
    "Synthetic data.")

Use JPEGs directly                               3200 Images / sec
Use TFRecords                                    3500 Images / sec
Use TFRecords with scaling optimizations        22800 Images / sec
Synthetic data.                                 52200 Images / sec


In [None]:
for batch_size in [32, 64, 128]:
  print("Batch size: {}".format(batch_size))
  train_model(make_jpg_dataset, batch_size)
  print()

Batch size: 32
Burn in complete.
30 steps
Mean step time:   0.19 sec
Images / sec:      172

Batch size: 64
Burn in complete.
30 steps
Mean step time:   0.24 sec
Images / sec:      266

Batch size: 128
Burn in complete.
30 steps
Mean step time:   0.37 sec
Images / sec:      348



## Add tf.function

In [None]:
for batch_size in [32, 64, 128]:
  print("Batch size: {}".format(batch_size))
  train_model(make_jpg_dataset, batch_size, use_tf_function=True)
  print()

Batch size: 32
Burn in complete.
30 steps
Mean step time:   0.10 sec
Images / sec:      307

Batch size: 64
Burn in complete.
30 steps
Mean step time:   0.20 sec
Images / sec:      321

Batch size: 128
Burn in complete.
30 steps
Mean step time:   0.41 sec
Images / sec:      314



## Add XLA

In [None]:
for batch_size in [32 , 64, 128]:
  print("Batch size: {}".format(batch_size))
  train_model(make_jpg_dataset, batch_size, use_tf_function=True, xla=True)
  print()

Batch size: 32
Burn in complete.
30 steps
Mean step time:   0.04 sec
Images / sec:      817

Batch size: 64
Burn in complete.
30 steps
Mean step time:   0.07 sec
Images / sec:      930

Batch size: 128
Burn in complete.
30 steps
Mean step time:   0.13 sec
Images / sec:     1001



## Add mixed precision

In [None]:
for batch_size in [32, 64, 128, 256]:
  print("Batch size: {}".format(batch_size))
  train_model(make_jpg_dataset, batch_size, use_tf_function=True, xla=True, mixed_precision=True)
  print()

Batch size: 32
Burn in complete.
30 steps
Mean step time:   0.04 sec
Images / sec:      813

Batch size: 64
Burn in complete.
30 steps
Mean step time:   0.05 sec
Images / sec:     1321

Batch size: 128
Burn in complete.
30 steps
Mean step time:   0.08 sec
Images / sec:     1653

Batch size: 256
Burn in complete.
30 steps
Mean step time:   0.13 sec
Images / sec:     1904



## Add distribution strategy and various tuning knobs.

In [None]:
for batch_size in [256]:
  print("Batch size: {}".format(batch_size))
  train_model(make_jpg_dataset, batch_size, strategy=tf.distribute.MirroredStrategy(["GPU:0"]), 
              use_tf_function=True, xla=True, mixed_precision=True, collect_profile=False)
  print()

  train_model(make_jpg_dataset, batch_size, strategy=tf.distribute.MirroredStrategy(["GPU:0"]), 
              use_tf_function=True, xla=True, mixed_precision=True, loss_scale=128, collect_profile=False)
  print()

  train_model(make_jpg_dataset, batch_size, strategy=tf.distribute.MirroredStrategy(["GPU:0"]), 
              use_tf_function=True, xla=True, mixed_precision=True, tuned=True, collect_profile=False)
  print()

  train_model(make_jpg_dataset, batch_size, strategy=tf.distribute.MirroredStrategy(["GPU:0"]), 
              use_tf_function=True, xla=True, mixed_precision=True, loss_scale=128, tuned=True, collect_profile=False)
  print()

Batch size: 256
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/r

## Move to multi device

In order for the input pipeline to keep up, we have to move to already resized images, and move the random augmentation onto the GPU, as the CPU cannot process the augmentation functions quickly enought even at 100% utilization. We also switch to using TFRecords instead of raw JPEGs.

In [None]:
for num_gpus in [1, 2, 4, 8]:
  for batch_size in [256 * num_gpus]:
    print("Batch size: {}".format(batch_size))
    
    train_model(functools.partial(make_dataset, already_resized=True), 
                batch_size, 
                strategy=tf.distribute.MirroredStrategy(["GPU:{}".format(i) for i in range(num_gpus)]), 
                use_tf_function=True, 
                xla=True, 
                mixed_precision=True,
                transform_on_device=True,
    )
    print()

Batch size: 256
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Burn in complete.
30 steps
Mean step time:   0.11 sec
Images / sec:     2268

Batch size: 512
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')
INFO:tensorflow:batch_all_reduce: 158 all-reduces with algorithm = nccl, num_packs = 1, agg_small_grads_max_bytes = 0 and agg_small_grads_max_group = 10
INFO:tensorflow:batch_all_reduce: 158 all-reduces with algorithm = nccl, num_packs = 1, agg_small_grads_max_bytes = 0 and agg_small_grads_max_group = 10
Burn in complete.
30 steps
Mean step time:   0.12 sec
Images / sec:     4269

Batch size: 1024
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:G