# TensorFlow Dataset API

* use tf.data to read data from memory
* `tf.data.Dataset.from_tensor_slices()`
* use tf.data in a training loop
* use tf.data to read data from disk
* `tf.data.experimental.make_csv_dataset`
* `tf.data.experimental.make_batched_features_dataset` (for TFRecords)
* write production input pipelines with feature engineering (batching, shuffling, etc.)

In [15]:
import json
import math
import os
from pprint import pprint # pretty print

import numpy as np
import tensorflow as tf
print(tf.version.VERSION)

2.4.1


## Loading data from memory

### Creating a synthetic dataset

In [2]:
N_POINTS = 10
X = tf.constant(range(N_POINTS), dtype=tf.float32)
Y = 2 * X + 10

We begin with implementing a function that takes as input


- our $X$ and $Y$ vectors of synthetic data generated by the linear function $y= 2x + 10$
- the number of passes over the dataset we want to train on (`epochs`)
- the size of the batches the dataset (`batch_size`)

and returns a `tf.data.Dataset`: 

**Remark:** Note that the last batch may not contain the exact number of elements you specified because the dataset was exhausted.

If you want batches with the exact same number of elements per batch, we will have to discard the last batch by
setting:

```python
dataset = dataset.batch(batch_size, drop_remainder=True)
```

In [3]:
# Using the tf.data.Dataset.from_tensor_slices() method we are able to get the slices of list or array

def create_dataset(X, Y, epochs, batch_size):
    dataset = tf.data.Dataset.from_tensor_slices((X, Y))
    dataset = dataset.repeat(epochs).batch(batch_size, drop_remainder=True)
    return dataset

Let's test our function by iterating twice over our dataset in batches of 3 datapoints:

In [8]:
BATCH_SIZE = 3
EPOCH = 2

dataset = create_dataset(X, Y, epochs=1, batch_size=3)

for i, (x, y) in enumerate(dataset):
    print("x:", x.numpy(), "y:", y.numpy())
    assert len(x) == BATCH_SIZE
    assert len(y) == BATCH_SIZE
assert  EPOCH

x: [0. 1. 2.] y: [10. 12. 14.]
x: [3. 4. 5.] y: [16. 18. 20.]
x: [6. 7. 8.] y: [22. 24. 26.]


### Loss function and gradients

In [9]:
def loss_mse(X, Y, w0, w1):
    Y_hat = w0 * X + w1
    errors = (Y_hat - Y)**2
    return tf.reduce_mean(errors)


def compute_gradients(X, Y, w0, w1):
    with tf.GradientTape() as tape:
        loss = loss_mse(X, Y, w0, w1)
    return tape.gradient(loss, [w0, w1])

### Training loop

In the traning loop, iterate directly on the `tf.data.Dataset` generated by our `create_dataset` function. 

In [10]:
EPOCHS = 250
BATCH_SIZE = 2
LEARNING_RATE = .02

MSG = "STEP {step} - loss: {loss}, w0: {w0}, w1: {w1}\n"

w0 = tf.Variable(0.0)
w1 = tf.Variable(0.0)

dataset = create_dataset(X, Y, epochs=EPOCHS, batch_size=BATCH_SIZE)

for step, (X_batch, Y_batch) in enumerate(dataset):

    dw0, dw1 = compute_gradients(X_batch, Y_batch, w0, w1)
    w0.assign_sub(dw0 * LEARNING_RATE)
    w1.assign_sub(dw1 * LEARNING_RATE)

    if step % 100 == 0:
        loss = loss_mse(X_batch, Y_batch, w0, w1)
        print(MSG.format(step=step, loss=loss, w0=w0.numpy(), w1=w1.numpy()))
        
assert loss < 0.0001
assert abs(w0 - 2) < 0.001
assert abs(w1 - 10) < 0.001

STEP 0 - loss: 109.76800537109375, w0: 0.23999999463558197, w1: 0.4399999976158142

STEP 100 - loss: 9.363959312438965, w0: 2.55655837059021, w1: 6.674341678619385

STEP 200 - loss: 1.393267273902893, w0: 2.2146825790405273, w1: 8.717182159423828

STEP 300 - loss: 0.20730558037757874, w0: 2.082810878753662, w1: 9.505172729492188

STEP 400 - loss: 0.03084510937333107, w0: 2.03194260597229, w1: 9.809128761291504

STEP 500 - loss: 0.004589457996189594, w0: 2.012321710586548, w1: 9.926374435424805

STEP 600 - loss: 0.0006827632314525545, w0: 2.0047526359558105, w1: 9.971602439880371

STEP 700 - loss: 0.00010164897685172036, w0: 2.0018346309661865, w1: 9.989042282104492

STEP 800 - loss: 1.5142451957217418e-05, w0: 2.000706911087036, w1: 9.995771408081055

STEP 900 - loss: 2.256260358990403e-06, w0: 2.0002737045288086, w1: 9.998367309570312

STEP 1000 - loss: 3.3405058275093324e-07, w0: 2.000105381011963, w1: 9.999371528625488

STEP 1100 - loss: 4.977664502803236e-08, w0: 2.000040054321289,

## Loading data from disk

### Locating the CSV files

The **taxifare dataset** files have been saved into `../data`.

Check that it is the case in the cell below, and, if not, regenerate the taxifare

In [11]:
!ls -l ../data/taxi*.csv

-rw-r--r-- 1 jupyter jupyter 123590 Apr 18 21:54 ../data/taxi-test.csv
-rw-r--r-- 1 jupyter jupyter 579055 Apr 18 21:54 ../data/taxi-train.csv
-rw-r--r-- 1 jupyter jupyter 123114 Apr 18 21:54 ../data/taxi-valid.csv


### Use tf.data to read the CSV files

The `tf.data` API can easily read csv files using the helper function tf.data.experimental.make_csv_dataset

If you have TFRecords (which is recommended), you may use tf.data.experimental.make_batched_features_dataset

The first step is to define 

- the feature names into a list `CSV_COLUMNS`
- their default values into a list `DEFAULTS`

In [12]:
CSV_COLUMNS = [
    'fare_amount',
    'pickup_datetime',
    'pickup_longitude',
    'pickup_latitude',
    'dropoff_longitude',
    'dropoff_latitude',
    'passenger_count',
    'key'
]
LABEL_COLUMN = 'fare_amount'

DEFAULTS = [[0.0], ['na'], [0.0], [0.0], [0.0], [0.0], [0.0], ['na']]

Let's now wrap the call to `make_csv_dataset` into its own function that will take only the file pattern (i.e. glob) where the dataset files are to be located:

Read: [make_csv_dataset](https://www.tensorflow.org/api_docs/python/tf/data/experimental/make_csv_dataset) documentation. The `pattern` will be given as an argument of the function but you should set the `batch_size`, `column_names` and `column_defaults`.

In [13]:
def create_dataset(pattern):
    
    return tf.data.experimental.make_csv_dataset(
        pattern, 1, CSV_COLUMNS, DEFAULTS)

tempds = create_dataset('../data/taxi-train*')
print(tempds)

<PrefetchDataset shapes: OrderedDict([(fare_amount, (1,)), (pickup_datetime, (1,)), (pickup_longitude, (1,)), (pickup_latitude, (1,)), (dropoff_longitude, (1,)), (dropoff_latitude, (1,)), (passenger_count, (1,)), (key, (1,))]), types: OrderedDict([(fare_amount, tf.float32), (pickup_datetime, tf.string), (pickup_longitude, tf.float32), (pickup_latitude, tf.float32), (dropoff_longitude, tf.float32), (dropoff_latitude, tf.float32), (passenger_count, tf.float32), (key, tf.string)])>


Note that this is a **prefetched dataset**, where each element is an `OrderedDict` whose keys are the feature names and whose values are tensors of shape `(1,)` (i.e. vectors).

Let's iterate over the two first element of this dataset using `dataset.take(2)` and let's convert them ordinary Python dictionary with numpy array as values for more readability:

In [14]:
for data in tempds.take(2):
    pprint({k: v.numpy() for k, v in data.items()})
    print("\n")

{'dropoff_latitude': array([40.758514], dtype=float32),
 'dropoff_longitude': array([-73.97567], dtype=float32),
 'fare_amount': array([9.], dtype=float32),
 'key': array([b'64'], dtype=object),
 'passenger_count': array([1.], dtype=float32),
 'pickup_datetime': array([b'2014-07-11 15:47:07 UTC'], dtype=object),
 'pickup_latitude': array([40.75126], dtype=float32),
 'pickup_longitude': array([-73.97404], dtype=float32)}


{'dropoff_latitude': array([40.7641], dtype=float32),
 'dropoff_longitude': array([-73.966415], dtype=float32),
 'fare_amount': array([9.], dtype=float32),
 'key': array([b'5029'], dtype=object),
 'passenger_count': array([1.], dtype=float32),
 'pickup_datetime': array([b'2013-07-10 08:40:07 UTC'], dtype=object),
 'pickup_latitude': array([40.78596], dtype=float32),
 'pickup_longitude': array([-73.95093], dtype=float32)}




### Transforming the features

What we really need is a dictionary of features + a label. So, we have to do two things to the above dictionary:

1. Remove the unwanted column "key"
1. Keep the label separate from the features

Let's first implement a function that takes as input a row (represented as an `OrderedDict` in our `tf.data.Dataset` as above) and then returns a tuple with two elements:

* The first element being the same `OrderedDict` with the label dropped
* The second element being the label itself (`fare_amount`)

Note: unwanted columes `pickup_datetime` and `key` needs to be dropped

In [16]:
UNWANTED_COLS = ['pickup_datetime', 'key']

def features_and_labels(row_data):
    label = row_data.pop(LABEL_COLUMN) # The .pop() method will return item and drop from frame
    features = row_data
    
    for unwanted_col in UNWANTED_COLS:
        features.pop(unwanted_col)

    return features, label

Let's iterate over 2 examples from our `tempds` dataset and apply our `feature_and_labels`
function to each of the examples to make sure it's working:

In [17]:
for row_data in tempds.take(2):
    features, label = features_and_labels(row_data)
    pprint(features)
    print(label, "\n")
    assert UNWANTED_COLS[0] not in features.keys()
    assert UNWANTED_COLS[1] not in features.keys()
    assert label.shape == [1]

OrderedDict([('pickup_longitude',
              <tf.Tensor: shape=(1,), dtype=float32, numpy=array([-73.97483], dtype=float32)>),
             ('pickup_latitude',
              <tf.Tensor: shape=(1,), dtype=float32, numpy=array([40.742184], dtype=float32)>),
             ('dropoff_longitude',
              <tf.Tensor: shape=(1,), dtype=float32, numpy=array([-73.989174], dtype=float32)>),
             ('dropoff_latitude',
              <tf.Tensor: shape=(1,), dtype=float32, numpy=array([40.7267], dtype=float32)>),
             ('passenger_count',
              <tf.Tensor: shape=(1,), dtype=float32, numpy=array([1.], dtype=float32)>)])
tf.Tensor([7.], shape=(1,), dtype=float32) 

OrderedDict([('pickup_longitude',
              <tf.Tensor: shape=(1,), dtype=float32, numpy=array([-73.9796], dtype=float32)>),
             ('pickup_latitude',
              <tf.Tensor: shape=(1,), dtype=float32, numpy=array([40.752693], dtype=float32)>),
             ('dropoff_longitude',
              <tf.Te

### Batching

Let's now refactor our `create_dataset` function so that it takes an additional argument `batch_size` and batch the data correspondingly. We will also use the `features_and_labels` function we implemented for our dataset to produce tuples of features and labels by applying a `.map(...)` method.

In [18]:
def create_dataset(pattern, batch_size):
    dataset = tf.data.experimental.make_csv_dataset(
        pattern, batch_size, CSV_COLUMNS, DEFAULTS)

    return dataset.map(features_and_labels)

Let's test that our batches are of the right size:

In [19]:
BATCH_SIZE = 2

tempds = create_dataset('../data/taxi-train*', batch_size=2)

for X_batch, Y_batch in tempds.take(2):
    pprint({k: v.numpy() for k, v in X_batch.items()})
    print(Y_batch.numpy(), "\n")
    assert len(Y_batch) == BATCH_SIZE

{'dropoff_latitude': array([40.78927 , 40.666294], dtype=float32),
 'dropoff_longitude': array([-73.94044, -73.78465], dtype=float32),
 'passenger_count': array([3., 1.], dtype=float32),
 'pickup_latitude': array([40.765556, 40.759056], dtype=float32),
 'pickup_longitude': array([-73.95767, -73.98107], dtype=float32)}
[ 7.  46.5] 

{'dropoff_latitude': array([40.78971 , 40.759026], dtype=float32),
 'dropoff_longitude': array([-73.977554, -73.99419 ], dtype=float32),
 'passenger_count': array([1., 1.], dtype=float32),
 'pickup_latitude': array([40.725666, 40.789974], dtype=float32),
 'pickup_longitude': array([-74.00752, -73.97701], dtype=float32)}
[20.   8.9] 



### Shuffling

**When training a deep learning model in batches over multiple workers, it is helpful if we shuffle the data. That way, different workers will be working on different parts of the input file at the same time, and so averaging gradients across workers will help.** Also, during training, we will need to read the data indefinitely.

Let's refactor our `create_dataset` function so that it shuffles the data, when the dataset is used for training.

We will introduce an additional argument `mode` to our function to allow the function body to distinguish the case
when it needs to shuffle the data (`mode == 'train'`) from when it shouldn't (`mode == 'eval'`).

Also, before returning we will want to prefetch 1 data point ahead of time (`dataset.prefetch(1)`) to speed-up training:

In [20]:
def create_dataset(pattern, batch_size=1, mode='eval'):
    dataset = tf.data.experimental.make_csv_dataset(
        pattern, batch_size, CSV_COLUMNS, DEFAULTS)

    dataset = dataset.map(features_and_labels).cache()

    if mode == 'train':
        dataset = dataset.shuffle(1000).repeat()

    # take advantage of multi-threading; 1=AUTOTUNE
    dataset = dataset.prefetch(1)
    
    return dataset

Let's check that our function works well in both modes:

In [21]:
tempds = create_dataset('../data/taxi-train*', 2, 'train')
print(list(tempds.take(1)))

[(OrderedDict([('pickup_longitude', <tf.Tensor: shape=(2,), dtype=float32, numpy=array([-73.776726, -73.957504], dtype=float32)>), ('pickup_latitude', <tf.Tensor: shape=(2,), dtype=float32, numpy=array([40.645336, 40.717686], dtype=float32)>), ('dropoff_longitude', <tf.Tensor: shape=(2,), dtype=float32, numpy=array([-74.00243, -73.9942 ], dtype=float32)>), ('dropoff_latitude', <tf.Tensor: shape=(2,), dtype=float32, numpy=array([40.728333, 40.686028], dtype=float32)>), ('passenger_count', <tf.Tensor: shape=(2,), dtype=float32, numpy=array([1., 1.], dtype=float32)>)]), <tf.Tensor: shape=(2,), dtype=float32, numpy=array([52. , 15.5], dtype=float32)>)]


In [22]:
tempds = create_dataset('../data/taxi-valid*', 2, 'eval')
print(list(tempds.take(1)))

[(OrderedDict([('pickup_longitude', <tf.Tensor: shape=(2,), dtype=float32, numpy=array([-73.970634, -73.968346], dtype=float32)>), ('pickup_latitude', <tf.Tensor: shape=(2,), dtype=float32, numpy=array([40.75192 , 40.760723], dtype=float32)>), ('dropoff_longitude', <tf.Tensor: shape=(2,), dtype=float32, numpy=array([-73.95365, -73.97654], dtype=float32)>), ('dropoff_latitude', <tf.Tensor: shape=(2,), dtype=float32, numpy=array([40.767395, 40.76535 ], dtype=float32)>), ('passenger_count', <tf.Tensor: shape=(2,), dtype=float32, numpy=array([3., 1.], dtype=float32)>)]), <tf.Tensor: shape=(2,), dtype=float32, numpy=array([8. , 5.5], dtype=float32)>)]


We can build model with this input pipeline.