In [1]:
# Understanding TensorFlow Input Pipeline and optimizing it.
import tensorflow as tf

In [6]:
# We will analyse, how to make dataset ingestion (including reading dataset,
# preprocessing it and making it available for training)
# To create an input pipeline, you must start with a data source.

# 1. download the dataset and make it in file
data_url = 'https://storage.googleapis.com/mledu-datasets/sparse-data-embedding/train.tfrecord'
print(data_url.split('/')[-1])
files = tf.keras.utils.get_file(data_url.split('/')[-1], data_url)


train.tfrecord


In [13]:
# create a parsing function which will transform each data record
def parse_func(record):
  # Extracts features and return labels.

  # record: File path to a TFRecord file

  features = {
    "terms": tf.io.VarLenFeature(dtype=tf.string),
    "labels": tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
  }

  parsed_features = tf.io.parse_single_example(record, features)
  labels = parsed_features['labels']

  return labels

In [18]:

# It creates as Record to read one or more TFRecords sequentially.
# It returns dataset which is a tf.data.Dataset object and It is nothing but Python iterable.
dataset = tf.data.TFRecordDataset(files)

# We can shuffle the dataset  as It may be coming from various different data sources
# this transformation maintains a fixed-size buffer and chooses the next element
# uniformly at random from that buffer.
dataset = dataset.shuffle(buffer_size=100)

# We can apply transformation on dataset. It will act as preprocessign steps on input data.
# There are verious transform functions like map, filter, flatmap etc.
dataset = dataset.map(lambda record : parse_func(record))

# The simplest form of batching stacks n (here = 32) consecutive elements of a dataset into a single element.
dataset = dataset.batch(batch_size = 32)

# All the steps will run and create respective Dataset objects and will execute it.
# Note: Since the buffer_size is 100, and the batch size is 32, the first batch
# contains no elements with an index over 132.

In [19]:
# Since dataset is Python iterable, we can iterate over the elements of dataset.
# Here, Iterator resource is created once and next() instance can be created once and called as many times.
# Once, We get out of loop scope, Iterator instance is deleted.
# thus, at one iteration a batch of 32 elements will be fetched

for record in dataset:
  print(record)
  break

tf.Tensor(
[[0.]
 [0.]
 [1.]
 [0.]
 [1.]
 [1.]
 [0.]
 [0.]
 [0.]
 [0.]
 [1.]
 [0.]
 [0.]
 [1.]
 [0.]
 [1.]
 [1.]
 [1.]
 [0.]
 [0.]
 [0.]
 [1.]
 [1.]
 [0.]
 [0.]
 [0.]
 [1.]
 [0.]
 [0.]
 [1.]
 [0.]
 [0.]], shape=(32, 1), dtype=float32)


In [None]:
# The tf.data API enables you to build complex input pipelines from simple, reusable pieces.
# and Now we use GPUs and TPUs that can radically reduce the time required to execute a
# single training step because of parallelization

# There are some scope of Optimization on above created data pipeline

# 1. software pipelining  =>  prefetch(buffer_size = X)
# 2. processing parallelization => map(...., num_parallel_calls = N)
# 3. I/O parallelization => Interleave (....., num_parallel_calls = N)


# lets understand traditional method:
# In a naive synchronous implementation like above, while your pipeline is fetching the data,
# your transformation model is sitting idle. Conversely, while your model is training,
# the input pipeline is sitting idle. The training step time is thus the sum of opening,
# reading and training times.

'''
e.g:

1. shuffle  -> takes 20ms.
2. map  -> takes 200ms
3. filter  -> takes 300ms

So, total time It will take to process all is => 20 + 200 + 300 = 520ms

'''


In [None]:
# On Contrary, If we use parallelization in transformation functions

'''
e.g :
1. shuffle  -> 20ms
2. map(..., num_parallel_calls = 1)  -> takes 200ms
3. filter(...., num_parallel_calls = 1)  -> takes 300ms

Here, total time to process all function is 300ms as all other functions will run in parallel threads.

'''

# Therefore, num_parallel_calls can help reduce the time consume for overall computation.
"""
Choosing the best value for the num_parallel_calls argument depends on your hardware,
characteristics of your training data (such as its size and shape)

"""

# But, A simple heuristic is to use the number of available CPU cores.
# We can use num_parallel_calls = AUTOTUNE, which will delegate the decision
# about what level of parallelism to use to the tf.data runtime.

In [None]:
# Caching
# The tf.data.Dataset.cache transformation can cache a dataset, either in memory
# or on local storage. This will save some operations (like file opening and data reading)
# from being executed during each epoch.

"""
If the user-defined function passed into the map transformation is expensive, apply
 the cache transformation after the map transformation as long as the resulting
 dataset can still fit into memory or local storage.
 """

In [None]:
# Reducing memory footprint
# A number of transformations, including interleave, prefetch, and shuffle, maintain
# an internal buffer of elements.

"""
 If the user-defined function passed into the map transformation changes the size of the elements,
  then the ordering of the map transformation and the transformations that buffer elements
  affects the memory usage. In general, choose the order that results in lower memory footprint,
  unless different ordering is desirable for performance.
  """

In [None]:
# As per Tensorflow official documentation page

"""
Here is a summary of the best practices for designing performant TensorFlow input pipelines:

-> Use the prefetch transformation to overlap the work of a producer and consumer
-> Parallelize the data reading transformation using the interleave transformation
-> Parallelize the map transformation by setting the num_parallel_calls argument
-> Use the cache transformation to cache data in memory during the first epoch
-> Vectorize user-defined functions passed in to the map transformation
-> Reduce memory usage when applying the interleave, prefetch, and shuffle transformations

"""