In [19]:
import numpy as np
import tensorflow as tf


# A typical TensorFlow training input pipeline can be framed as an ETL process:

# Extract: Read data from persistent storage -- either local (e.g. HDD or SSD) or remote (e.g. GCS or HDFS).

# Transform: Use CPU cores to parse and perform preprocessing operations on the data such as image decompression, 
#     data augmentation transformations (such as random crop, flips, and color distortions), shuffling, and batching.

# Load: Load the transformed data onto the accelerator device(s) (for example, GPU(s) or TPU(s)) that execute the 
#     machine learning model.


# The tf.data API provides users with building blocks to design input pipelines that effectively utilize the CPU, 
#     optimizing each step of the ETL process.



# Pipelining overlaps the preprocessing and model execution of a training step. While the accelerator is performing 
#     training step N, the CPU is preparing the data for step N+1




np.set_printoptions(linewidth=150, precision=3, suppress=True)


M = 10
d = 2
# samples
X = tf.constant(np.random.randn(M, d), 'float32')
# ids of samples, say each sample have different id
Y = tf.constant(np.expand_dims(np.random.permutation(M), 1), 'float32')

dataset_items = (X,Y)
# first dimensions must match
# also they should be at least 2 rank
first_dims = [item.shape.as_list()[0] for item in dataset_items]
assert np.all(np.equal(first_dims, first_dims[0]))

batch_size = 2
n_epochs = 5

print(X)





dataset = tf.data.Dataset.from_tensor_slices((X, Y))

# buffer_size: A tf.int64 scalar tf.Tensor, representing the number of elements from this 
# dataset from which the new dataset will sample. Defaults to reshuffle each iteration over dataset
dataset = dataset.shuffle(buffer_size=M)


## Doing pararell preprocessing of data using map function
def func(X, Y):
  X += 10
  return X, Y

dataset = dataset.map(func)


#dataset = dataset.map(map_func=parse_fn, num_parallel_calls=FLAGS.num_parallel_calls)
#dataset = dataset.batch(batch_size=FLAGS.batch_size)

# If batch size is large, fuse map and batch function
# dataset = dataset.apply(
#             tf.contrib.data.map_and_batch(map_func=parse_fn, batch_size=FLAGS.batch_size))




# Combines consecutive elements of this dataset into batches.
# If your program depends on the batches having the same outer dimension, you should set the drop_remainder 
# argument to True to prevent the smaller batch from being produced.
dataset = dataset.batch(batch_size, drop_remainder=True)




# #The simplest way to iterate over a dataset in multiple epochs is to use the Dataset.repeat() transformation
# # Applying the Dataset.repeat() transformation with no arguments will repeat the input indefinitely
# dataset = dataset.repeat(n_epochs)


# The tf.data API provides a software pipelining mechanism through the tf.data.Dataset.prefetch transformation, which
# can be used to decouple the time data is produced from the time it is consumed. To achieve the pipelining effect 
# you can add prefetch(1) as the final transformation to your dataset pipeline (or prefetch(n) if a single training 
# step consumes n elements).

dataset = dataset.prefetch(2)



# training_iterator = training_dataset.make_one_shot_iterator()
# validation_iterator = validation_dataset.make_initializable_iterator()
# next_train_batch = training_iterator.get_next()
# next_test_batch = validation_iterator.get_next()

# sess.run(training_iterator.initializer)


# while True:
#   for _ in range(200):
#     sess.run(next_train_batch)

#   # Run one pass over the validation dataset.
#   sess.run(validation_iterator.initializer)
#   for _ in range(50):
#     sess.run(next_test_batch)


dataset_iterator = dataset.make_initializable_iterator()
next_batch = dataset_iterator.get_next()

sess = tf.Session()
sess.run(tf.global_variables_initializer())
sess.run(dataset_iterator.initializer)

occurrence = np.zeros([M], 'int32')

it = 0


# Compute for n epochs.
for _ in range(1):
    sess.run(dataset_iterator.initializer)
    while True:
        try:
            it += 1
            xb, yb = sess.run(next_batch)
            occurrence[np.int32(yb)] += 1
            print('%03d, x:%s, y:%s ' % (it, str(xb.ravel()), str(yb.ravel())))
        except tf.errors.OutOfRangeError:
            print('end of dataset')
            break

   
sess.close()

print('occurrence array:', occurrence) # all entries should be M, indicating each sample is fetched M times

Tensor("Const_34:0", shape=(10, 2), dtype=float32)
001, x:[11.529  8.587  9.479  9.592], y:[9. 5.] 
002, x:[ 8.637  9.138  8.935 10.369], y:[3. 2.] 
003, x:[ 9.46  10.829 10.699 10.724], y:[1. 7.] 
004, x:[ 8.713 10.339 11.304 10.746], y:[0. 6.] 
005, x:[10.623 10.578 10.143  9.266], y:[8. 4.] 
006, x:[ 8.935 10.369 10.623 10.578], y:[2. 8.] 
007, x:[ 8.713 10.339 10.699 10.724], y:[0. 7.] 
008, x:[9.479 9.592 8.637 9.138], y:[5. 3.] 
009, x:[10.143  9.266 11.529  8.587], y:[4. 9.] 
010, x:[ 9.46  10.829 11.304 10.746], y:[1. 6.] 
011, x:[ 8.637  9.138  9.46  10.829], y:[3. 1.] 
012, x:[10.623 10.578 11.304 10.746], y:[8. 6.] 
013, x:[10.143  9.266  9.479  9.592], y:[4. 5.] 
014, x:[ 8.935 10.369 10.699 10.724], y:[2. 7.] 
015, x:[11.529  8.587  8.713 10.339], y:[9. 0.] 
016, x:[10.699 10.724 11.529  8.587], y:[7. 9.] 
017, x:[10.623 10.578 10.143  9.266], y:[8. 4.] 
018, x:[11.304 10.746  9.46  10.829], y:[6. 1.] 
019, x:[ 8.713 10.339  8.935 10.369], y:[0. 2.] 
020, x:[9.479 9.592 8.

### Links to tf.data and input pipeline performance
https://www.tensorflow.org/guide/datasets
https://www.tensorflow.org/performance/datasets_performance


In [3]:
# SAVING AND RESTORING ITERATOR
# Create saveable object from iterator.
# saveable = tf.contrib.data.make_saveable_from_iterator(iterator)

# # Save the iterator state by adding it to the saveable objects collection.
# tf.add_to_collection(tf.GraphKeys.SAVEABLE_OBJECTS, saveable)
# saver = tf.train.Saver()

# with tf.Session() as sess:

#   if should_checkpoint:
#     saver.save(path_to_checkpoint)

# # Restore the iterator state.
# with tf.Session() as sess:
#   saver.restore(sess, path_to_checkpoint)

## Reading input data

In [4]:
# NUMPY arrays


# Load the training data into two NumPy arrays, for example using `np.load()`.
with np.load("/var/data/training_data.npy") as data:
  features = data["features"]
  labels = data["labels"]

# Assume that each row of `features` corresponds to the same row as `labels`.
assert features.shape[0] == labels.shape[0]

dataset = tf.data.Dataset.from_tensor_slices((features, labels))





# If all of your input data fit in memory, the simplest way to create a Dataset from them is to convert them to 
# tf.Tensor objects and use Dataset.from_tensor_slices().

# Load the training data into two NumPy arrays, for example using `np.load()`.
with np.load("/var/data/training_data.npy") as data:
  features = data["features"]
  labels = data["labels"]

# Assume that each row of `features` corresponds to the same row as `labels`.
assert features.shape[0] == labels.shape[0]

features_placeholder = tf.placeholder(features.dtype, features.shape)
labels_placeholder = tf.placeholder(labels.dtype, labels.shape)

dataset = tf.data.Dataset.from_tensor_slices((features_placeholder, labels_placeholder))
# [Other transformations on `dataset`...]
dataset = ...
iterator = dataset.make_initializable_iterator()

sess.run(iterator.initializer, feed_dict={features_placeholder: features,
                                          labels_placeholder: labels})


FileNotFoundError: [Errno 2] No such file or directory: '/var/data/training_data.npy'

In [6]:
# CSV DATA
# Creates a dataset that reads all of the records from two CSV files, each with
# eight float columns
filenames = ["/var/data/file1.csv", "/var/data/file2.csv"]
record_defaults = [tf.float32] * 8   # Eight required float columns
dataset = tf.contrib.data.CsvDataset(filenames, record_defaults)

# Creates a dataset that reads all of the records from two CSV files, each with
# four float columns which may have missing values
record_defaults = [[0.0]] * 8
dataset = tf.contrib.data.CsvDataset(filenames, record_defaults)

# Creates a dataset that reads all of the records from two CSV files with
# headers, extracting float data from columns 2 and 4.
record_defaults = [[0.0]] * 2  # Only provide defaults for the selected columns
dataset = tf.contrib.data.CsvDataset(filenames, record_defaults, header=True, select_cols=[2,4])



##  Preproceccing with Sataset.map()


In [8]:
# Reads an image from a file, decodes it into a dense tensor, and resizes it
# to a fixed shape.
def _parse_function(filename, label):
  image_string = tf.read_file(filename)
  image_decoded = tf.image.decode_jpeg(image_string)
  image_resized = tf.image.resize_images(image_decoded, [28, 28])
  return image_resized, label

# A vector of filenames.
filenames = tf.constant(["/var/data/image1.jpg", "/var/data/image2.jpg", ...])

# `labels[i]` is the label for the image in `filenames[i].
labels = tf.constant([0, 37, ...])

dataset = tf.data.Dataset.from_tensor_slices((filenames, labels))
dataset = dataset.map(_parse_function)

TypeError: Failed to convert object of type <class 'list'> to Tensor. Contents: ['/var/data/image1.jpg', '/var/data/image2.jpg', Ellipsis]. Consider casting elements to a supported type.

## Simple batching

In [9]:
inc_dataset = tf.data.Dataset.range(100)
dec_dataset = tf.data.Dataset.range(0, -100, -1)
dataset = tf.data.Dataset.zip((inc_dataset, dec_dataset))
batched_dataset = dataset.batch(4)

iterator = batched_dataset.make_one_shot_iterator()
next_element = iterator.get_next()

print(sess.run(next_element))  # ==> ([0, 1, 2,   3],   [ 0, -1,  -2,  -3])
print(sess.run(next_element))  # ==> ([4, 5, 6,   7],   [-4, -5,  -6,  -7])
print(sess.run(next_element))  # ==> ([8, 9, 10, 11],   [-8, -9, -10, -11])

RuntimeError: Attempted to use a closed Session.

## Training


In [None]:
# If you want to receive a signal at the end of each epoch, you can write a training loop that catches the 
# tf.errors.OutOfRangeError at the end of a dataset. At that point you might collect some statistics 
# (e.g. the validation error) for the epoch.

filenames = ["/var/data/file1.tfrecord", "/var/data/file2.tfrecord"]
dataset = tf.data.TFRecordDataset(filenames)
dataset = dataset.map(...)
dataset = dataset.batch(32)
iterator = dataset.make_initializable_iterator()
next_element = iterator.get_next()

# Compute for 100 epochs.
for _ in range(100):
  sess.run(iterator.initializer)
  while True:
    try:
      sess.run(next_element)
    except tf.errors.OutOfRangeError:
      break

  # [Perform end-of-epoch calculations here.]