## A large portion of this code is taken from Aurélien Géron's: Hands-On machine learning with SciKit-Learn, Keras and Tensorflow (2nd edition). I have put comments based on information in the book as well as information I found elsewhere.

## Chapter 13

The Data API (tf.data) allows you to ingest and preprocess a large dataset efficiently. It also works seamlessly with tf.keras.

Off-the-shelf, it can read from:
  - text files
  - binary files with fixed-sized records
  - binary files using tensorflow's TFRecord format
  - SQL databases
Many open-source extensions are available to read from all sorts of data sources (look at the TFIO API https://github.com/tensorflow/io and https://www.tensorflow.org/io/api_docs/python/tfio)

This notebook covers:
  - the Data API
  - the TFRecord format
  - how to create custom preprocessing layers
  - how to use the standard Keras layers
  - projects from the TF ecosystem
TF Transform (tf.Transform):
  - write a single preprocessing function to run in batch mode on your full training set.
  - Preprocess the function itself
    - to speed it up
    - to export it to a TF function
    - incorporate it in your trained model (once it is deployed in production, it can
      take care of preprocessing new instances on the fly).
TF Datasets (TFDS):
  - Provides:
    - a convenient function to download many common datasets
    - convenient dataset objects (tf.data.dataset) to manipulate the datasets
      using the Data API

In [1]:
import numpy as np
import pandas as pd

import tensorflow as tf
from tensorflow import keras

import sklearn
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split

## The Data API

In [53]:
X = tf.range(10)
X

<tf.Tensor: id=950, shape=(10,), dtype=int32, numpy=array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=int32)>

In [54]:
dataset = tf.data.Dataset.from_tensor_slices(X)  # creates dataset in RAM
dataset

<TensorSliceDataset shapes: (), types: tf.int32>

In [55]:
def print_dataset(dataset, comment=''):
    print(comment)
    for item in dataset:
        print(item)

print_dataset(dataset, '')


tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(3, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(5, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(7, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(9, shape=(), dtype=int32)


In [56]:
# Dataset transformation methods return a new dataset,
# so you can chain transformations.
# All dataset methods do not modify the existing dataset,
# so make sure to specify dataset = dataset.method()
# instead of dataset.method(), otherwise the updated
# values will not be saved to any variable.

# repeat(3) does not take 3x the RAM as the original dataset.
dataset1 = dataset.repeat(3). \
            batch(7)                       # by default, drop_remainder=False
dataset2 = dataset.repeat(3). \
            batch(7, drop_remainder=True)  # by default, drop_remainder=False
print_dataset(dataset1, 'In batch() function, drop_remainder=False (default) :')
print_dataset(dataset2, 'In batch() function, drop_remainder=True :')

In batch() function, drop_remainder=False (default) :
tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int32)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int32)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int32)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int32)
tf.Tensor([8 9], shape=(2,), dtype=int32)
In batch() function, drop_remainder=True :
tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int32)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int32)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int32)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int32)


In [57]:
dataset = tf.data.Dataset.from_tensor_slices(X)  # creates dataset in RAM
                                                 # Items: [0, 1, 2, 3, 4,  5,  6,  7,  8,  9]
dataset2 = dataset.map(lambda x: x * 2)          # Items: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
# The map function is usually applied during preprocessing.

# The num_parallel_calls argument tells map() function
# to run map() in parallel using the given number of threads.
# AUTOTUNE sets the number of threads automatically
# to the best possible number.
dataset2 = dataset.map(lambda x: x * 2, 
                       num_parallel_calls=tf.data.experimental.AUTOTUNE)
print_dataset(dataset2, '')


tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(10, shape=(), dtype=int32)
tf.Tensor(12, shape=(), dtype=int32)
tf.Tensor(14, shape=(), dtype=int32)
tf.Tensor(16, shape=(), dtype=int32)
tf.Tensor(18, shape=(), dtype=int32)


The map() function applies a transformation to each item. It's properties:

|if function given to map() returns|result.output_classes =|result.types =|result.output_shapes =|
|---|---|---|---|
|tf.constant(37.0)|tf.Tensor|tf.float32|[]  # scalar|
|tf.constant(37.0), tf.constant(["Foo", "Bar", "Baz"])|(tf.Tensor, tf.Tensor)|(tf.float32, tf.string)|([], [3])|
|37.0, ["Foo", "Bar", "Baz"], np.array([1.0, 2.0] dtype=np.float64)|(tf.Tensor, tf.Tensor, tf.Tensor)|(tf.float32, tf.string, tf.float64)|([], [3], [2])|
|{"a": 37.0, "b": [42, 16]}, "foo"|({"a": tf.Tensor, "b": tf.Tensor}, tf.Tensor)|({"a": tf.float32, "b": tf.int32}, tf.string)|({"a": [], "b": [2]}, [])|

The apply() function applies a transformation to the whole dataset.

In [58]:
dataset1 = dataset.repeat(3).batch(7)

print_dataset(dataset1, 'Before unbatch():')
dataset2 = dataset1.apply(tf.data.experimental.unbatch())
print_dataset(dataset2, 'After unbatch():')

dataset3 = dataset2.filter(lambda x: x < 5)
print_dataset(dataset3, 'After filter():')

dataset4 = dataset2.take(3)
print_dataset(dataset4, 'After take(3):')

Before unbatch():
tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int32)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int32)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int32)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int32)
tf.Tensor([8 9], shape=(2,), dtype=int32)
After unbatch():
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(3, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(5, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(7, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(9, shape=(), dtype=int32)
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(3, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(5, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(7, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(9, shape=(), dtype=in

## Suffling the data

By default, a sequential layer has a shuffle=True parameter.
This causes the layer to shuffle each dataset before each epoch.
This ensures the neural network is not tuning itself to
the sequence of inputs in the training set.

In [62]:
# You can shuffle the dataset using the shuffle() function.
# The buffer argument creates a buffer between the client
# of the shuffle() function and the source dataset:
#   client <- shuffle() <- buffer <- source dataset
# shuffle() pre-populates the buffer from the source dataset.
# shuffle() returns items randomly from the buffer when asked,
# and keeps refreshing those empty spots from the source dataset
# until there are no more items in the source dataset.
# Make the buffer_size large enough for it to be effective.
# 
dataset = tf.data.Dataset.range(10).repeat(3) # 0 to 9, three times
dataset = dataset.shuffle(buffer_size=5, 
                          seed=42,
                          reshuffle_each_iteration=False).batch(7)
print_dataset(dataset)


tf.Tensor([2 5 1 7 0 8 9], shape=(7,), dtype=int64)
tf.Tensor([0 4 3 1 5 6 4], shape=(7,), dtype=int64)
tf.Tensor([6 7 2 9 0 3 3], shape=(7,), dtype=int64)
tf.Tensor([2 8 4 6 8 9 7], shape=(7,), dtype=int64)
tf.Tensor([5 1], shape=(2,), dtype=int64)


In [64]:
dataset2 = dataset.repeat(2)
print_dataset(dataset2)      # repeat() on a shuffled dataset generates a new order
                             # at each iteration, unless reshuffle_each_iteration=False
                             # in the shuffle() function.


tf.Tensor([2 5 1 7 0 8 9], shape=(7,), dtype=int64)
tf.Tensor([0 4 3 1 5 6 4], shape=(7,), dtype=int64)
tf.Tensor([6 7 2 9 0 3 3], shape=(7,), dtype=int64)
tf.Tensor([2 8 4 6 8 9 7], shape=(7,), dtype=int64)
tf.Tensor([5 1], shape=(2,), dtype=int64)
tf.Tensor([2 5 1 7 0 8 9], shape=(7,), dtype=int64)
tf.Tensor([0 4 3 1 5 6 4], shape=(7,), dtype=int64)
tf.Tensor([6 7 2 9 0 3 3], shape=(7,), dtype=int64)
tf.Tensor([2 8 4 6 8 9 7], shape=(7,), dtype=int64)
tf.Tensor([5 1], shape=(2,), dtype=int64)


In [None]:
# The Linux shuf command shuffles lines in a file.
# For OSX,
#   brew install coreutils
#   gshuf                  # links to shuf
# man shuf or man gshuf for usage.

### Interleaving from multiple files

Let's figure out how to shuffle using the California Housing dataset.

In [67]:
housing = fetch_california_housing()
housing

{'data': array([[   8.3252    ,   41.        ,    6.98412698, ...,    2.55555556,
           37.88      , -122.23      ],
        [   8.3014    ,   21.        ,    6.23813708, ...,    2.10984183,
           37.86      , -122.22      ],
        [   7.2574    ,   52.        ,    8.28813559, ...,    2.80225989,
           37.85      , -122.24      ],
        ...,
        [   1.7       ,   17.        ,    5.20554273, ...,    2.3256351 ,
           39.43      , -121.22      ],
        [   1.8672    ,   18.        ,    5.32951289, ...,    2.12320917,
           39.43      , -121.32      ],
        [   2.3886    ,   16.        ,    5.25471698, ...,    2.61698113,
           39.37      , -121.24      ]]),
 'target': array([4.526, 3.585, 3.521, ..., 0.923, 0.847, 0.894]),
 'feature_names': ['MedInc',
  'HouseAge',
  'AveRooms',
  'AveBedrms',
  'Population',
  'AveOccup',
  'Latitude',
  'Longitude'],
 'DESCR': '.. _california_housing_dataset:\n\nCalifornia Housing dataset\n--------------------