<table align="left">
  <td>
    <a href="https://colab.research.google.com/github/marco-canas/didactica_ciencia_datos/blob/main/referentes/geron/part_2/c_11/c_11.ipynb#scrollTo=3496adae" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>
  </td>
</table>

# Capítulo 13. Carga y preprocesamiento de datos con TensorFlow

Hasta ahora, solo hemos utilizado conjuntos de datos que caben en la memoria, pero los sistemas de aprendizaje profundo a menudo se entrenan en conjuntos de datos muy grandes que no caben en la RAM.

Ingesting a large dataset and preprocessing it efficiently can be tricky to implement with other Deep Learning libraries, but TensorFlow makes it easy thanks to the Data API: you just create a dataset object, and tell it where to get the data and how to transform it. 

TensorFlow takes care of all the implementation details, such as multithreading, queuing, batching, and prefetching. 

Moreover, the Data API works seamlessly with tf.keras!

Off the shelf, the Data API can read from text files (such as CSV files), binary files with fixed-size records, and binary files that use TensorFlow’s TFRecord format, which supports records of varying sizes. 

TFRecord is a flexible and efficient binary format based on Protocol Buffers (an open source binary format). 

The Data API also has support for reading from SQL databases. 

Moreover, many open source extensions are available to read from all sorts of data sources, such as Google’s BigQuery service.

Reading huge datasets efficiently is not the only difficulty: the data also needs to be preprocessed, usually normalized.

Moreover, it is not always composed strictly of convenient numerical fields: there may be text features, categorical features, and so on. 

These need to be encoded, for example using one-hot encoding, bag-of-words encoding, or embeddings (as we will see, an embedding is a trainable dense vector that represents a category or token). 

One option to handle all this preprocessing is to write your own custom preprocessing layers. 

Another is to use the standard preprocessing layers provided by Keras.

In this chapter, we will cover the Data API, the TFRecord format, and how to create custom preprocessing layers and use the standard Keras ones. 

We will also take a quick look at a few related projects from TensorFlow’s ecosystem:
TF Transform (tf.Transform)

Makes it possible to write a single preprocessing function that can be run in batch mode on your full training set, before training (to speed it up), and then exported to a TF Function and incorporated into your trained model so that once it is deployed in production it can take care of preprocessing new instances on the fly.

## TF Datasets (TFDS)

Provides a convenient function to download many common datasets of all kinds, including large ones like ImageNet, as well as convenient dataset objects to manipulate them using the Data API.

So let’s get started!

## The Data API

The whole Data API revolves around the concept of a dataset: as you might suspect, this represents a sequence of data items. Usually you will use datasets that gradually read data from disk, but for simplicity let’s create a dataset entirely in RAM using `tf.data.Dataset.from_tensor_slices()`:

In [1]:
import tensorflow as tf 

In [2]:
X = tf.range(10)     # any data tensor
dataset = tf.data.Dataset.from_tensor_slices(X)
dataset

<TensorSliceDataset element_spec=TensorSpec(shape=(), dtype=tf.int32, name=None)>

The from_tensor_slices() function takes a tensor and creates a tf.data.Dataset whose elements are all the slices of X (along the first dimension), so this dataset contains 10 items: tensors 0, 1, 2, …, 9. 

In this case we would have obtained the same dataset if we had used `tf.data.Dataset.range(10)`.

You can simply iterate over a dataset’s items like this:

In [3]:
for item in dataset:
    print(item)


tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(3, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(5, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(7, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(9, shape=(), dtype=int32)


# Chaining Transformations

Once you have a dataset, you can apply all sorts of transformations to it by calling its transformation methods. 

Each method returns a new dataset, so you can chain transformations like this (this chain is illustrated in Figure 13-1):

In [4]:
dataset = dataset.repeat(3).batch(7)
for item in dataset:
    print(item)

tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int32)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int32)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int32)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int32)
tf.Tensor([8 9], shape=(2,), dtype=int32)


In this example, we first call the `repeat()` method on the original dataset, and it returns a new dataset that will repeat the items of the original dataset three times. 

Of course, this will not copy all the data in memory three times! 

(If you call this method with no arguments, the new dataset will repeat the source dataset forever, so the code that iterates over the dataset will have to decide when to stop.) 

Then we call the `batch()` method on this new dataset, and again this creates a new dataset. 

This one will group the items of the previous dataset in batches of seven items. 

Finally, we iterate over the items of this final dataset. 

As you can see, the `batch()` method had to output a final batch of size two instead of seven, but you can call it with
drop_remainder=True if you want it to drop this final batch so that all batches have the exact same size.

## WARNING

The dataset methods do not modify datasets, they create new ones, so make sure to keep a reference to these new datasets (e.g., with dataset = ...), or else nothing will happen.

You can also transform the items by calling the map() method. 

For example, this creates a new dataset with all items doubled:

In [5]:
dataset = dataset.map(lambda x: x * 2) # Items: [0,2,4,6,8,10,12]

This function is the one you will call to apply any preprocessing you want to your data. 

Sometimes this will include computations that can be quite intensive, such as reshaping or rotating an image, so you will usually want to spawn multiple threads to speed things up: it’s as simple as setting the num_parallel_calls argument. 

Note that the function you pass to the map() method must be convertible to a TF Function (see Chapter 12).

While the map() method applies a transformation to each item, the apply() method applies a transformation to the dataset as a whole. 

For example, the following code applies the unbatch() function to the dataset (this function is currently experimental, but it will most likely move to the core API in a future release). 

Each item in the new dataset will be a single-integer tensor instead of a batch of seven integers:

In [None]:
dataset = dataset.apply(tf.data.experimental.unbatch()) # Items: 0,2,4,...

It is also possible to simply filter the dataset using the filter() method:

In [None]:
>>> dataset = dataset.filter(lambda x: x < 10) # Items: 0 2 4 6 8 0 2 4 6...

You will often want to look at just a few items from a dataset. 

You can use the take() method for that:

In [None]:
>>> for item in dataset.take(3):
... print(item)
...

tf.Tensor(0, shape=(), dtype=int64)
tf.Tensor(2, shape=(), dtype=int64)
tf.Tensor(4, shape=(), dtype=int64)

## Shuffling the Data

As you know, Gradient Descent works best when the instances in the training set are independent and identically distributed (see Chapter 4). 

A simple way to ensure this is to shuffle the instances, using the `shuffle()` method. 

It will create a new dataset that will start by filling up a buffer with the first items of the source dataset. 

Then, whenever it is asked for an item, it will pull one out randomly from the buffer and replace it with a fresh one from the source dataset, until it has iterated entirely through the source dataset. 

At this point it continues to pull out items randomly from the buffer until it is empty. 

You must specify the buffer size, and it is important to make it large enough, or else shuffling will not be very effective. 

Just don’t exceed the amount of RAM you have, and even if you have plenty of it, there’s no need to go beyond the dataset’s size. 

You can provide a random seed if you want the same random order every time you run your program.

For example, the following code creates and displays a dataset containing the integers 0 to 9, repeated 3 times, shuffled using a buffer of size 5 and a random seed of 42, and batched with a batch size of 7:

In [11]:
dataset = tf.data.Dataset.range(10).repeat(3) # 0 to 9, three times
dataset = dataset.shuffle(buffer_size=5, seed=42).batch(7)
for item in dataset:
    print(item)

tf.Tensor([0 2 3 6 7 9 4], shape=(7,), dtype=int64)
tf.Tensor([5 0 1 1 8 6 5], shape=(7,), dtype=int64)
tf.Tensor([4 8 7 1 2 3 0], shape=(7,), dtype=int64)
tf.Tensor([5 4 2 7 8 9 9], shape=(7,), dtype=int64)
tf.Tensor([3 6], shape=(2,), dtype=int64)


In [None]:
tf.Tensor([0 2 3 6 7 9 4], shape=(7,), dtype=int64)
tf.Tensor([5 0 1 1 8 6 5], shape=(7,), dtype=int64)
tf.Tensor([4 8 7 1 2 3 0], shape=(7,), dtype=int64)
tf.Tensor([5 4 2 7 8 9 9], shape=(7,), dtype=int64)
tf.Tensor([3 6], shape=(2,), dtype=int64)

### TIP

If you call repeat() on a shuffled dataset, by default it will generate a new order at every iteration. 

This is generally a good idea, but if you prefer to reuse the same order at each iteration (e.g., for tests or debugging), you can set reshuffle_each_iteration=False.

For a large dataset that does not fit in memory, this simple shuffling-buffer
approach may not be sufficient, since the buffer will be small compared to
the dataset. One solution is to shuffle the source data itself (for example, on
Linux you can shuffle text files using the shuf command). This will
definitely improve shuffling a lot! Even if the source data is shuffled, you
will usually want to shuffle it some more, or else the same order will be
repeated at each epoch, and the model may end up being biased (e.g., due to some spurious patterns present by chance in the source data’s order). To
shuffle the instances some more, a common approach is to split the source
data into multiple files, then read them in a random order during training.
However, instances located in the same file will still end up close to each
other. To avoid this you can pick multiple files randomly and read them
simultaneously, interleaving their records. Then on top of that you can add a
shuffling buffer using the shuffle() method. If all this sounds like a lot of
work, don’t worry: the Data API makes all this possible in just a few lines
of code. Let’s see how to do this.

Interleaving lines from multiple files
First, let’s suppose that you’ve loaded the California housing dataset,
shuffled it (unless it was already shuffled), and split it into a training set, a
validation set, and a test set. Then you split each set into many CSV files
that each look like this (each row contains eight input features plus the
target median house value):
MedInc,HouseAge,AveRooms,AveBedrms,Popul,AveOccup,Lat,Long,MedianHouseValue
3.5214,15.0,3.0499,1.1065,1447.0,1.6059,37.63,-122.43,1.442
5.3275,5.0,6.4900,0.9910,3464.0,3.4433,33.69,-117.39,1.687
3.1,29.0,7.5423,1.5915,1328.0,2.2508,38.44,-122.98,1.621
[...]
Let’s also suppose train_filepaths contains the list of training file paths
(and you also have valid_filepaths and test_filepaths):

In [10]:
train_filepaths

NameError: name 'train_filepaths' is not defined

['datasets/housing/my_train_00.csv', 'datasets/housing/my_train_01.csv',...]

Alternatively, you could use file patterns; for example, train_filepaths
= "datasets/housing/my_train_*.csv". Now let’s create a dataset
containing only these file paths:

In [9]:
filepath_dataset = tf.data.Dataset.list_files(train_filepaths, seed=42)

NameError: name 'train_filepaths' is not defined

By default, the list_files() function returns a dataset that shuffles the
file paths. In general this is a good thing, but you can set shuffle=False if
you do not want that for some reason.

Next, you can call the interleave() method to read from five files at a time and interleave their lines (skipping the first line of each file, which is the header row, using the skip() method):

In [8]:
n_readers = 5
dataset = filepath_dataset.interleave(
    lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
    cycle_length=n_readers)

NameError: name 'filepath_dataset' is not defined

The interleave() method will create a dataset that will pull five file paths
from the filepath_dataset, and for each one it will call the function you
gave it (a lambda in this example) to create a new dataset (in this case a
TextLineDataset). To be clear, at this stage there will be seven datasets in
all: the filepath dataset, the interleave dataset, and the five
TextLineDatasets created internally by the interleave dataset. When we
iterate over the interleave dataset, it will cycle through these five
TextLineDatasets, reading one line at a time from each until all datasets
are out of items. Then it will get the next five file paths from the
filepath_dataset and interleave them the same way, and so on until it
runs out of file paths.

### TIP

For interleaving to work best, it is preferable to have files of identical length; otherwise the ends of the longest files will not be interleaved.

By default, interleave() does not use parallelism; it just reads one line at a time from each file, sequentially. 

If you want it to actually read files in parallel, you can set the num_parallel_calls argument to the number of threads you want (note that the map() method also has this argument). 

You can even set it to tf.data.experimental.AUTOTUNE to make TensorFlow choose the right number of threads dynamically based on the available CPU (however, this is an experimental feature for now). Let’s look at what the dataset contains now:

>>> for line in dataset.take(5):
... print(line.numpy())
...

b'4.2083,44.0,5.3232,0.9171,846.0,2.3370,37.47,-122.2,2.782'
b'4.1812,52.0,5.7013,0.9965,692.0,2.4027,33.73,-118.31,3.215'
b'3.6875,44.0,4.5244,0.9930,457.0,3.1958,34.04,-118.15,1.625'
b'3.3456,37.0,4.5140,0.9084,458.0,3.2253,36.67,-121.7,2.526'
b'3.5214,15.0,3.0499,1.1065,1447.0,1.6059,37.63,-122.43,1.442'

These are the first rows (ignoring the header row) of five CSV files, chosen randomly. 

Looks good! But as you can see, these are just byte strings; we need to parse them and scale the data.

### Preprocessing the Data

Let’s implement a small function that will perform this preprocessing:

In [7]:
X_mean, X_std = [...] # mean and scale of each feature in the training set
n_inputs = 8
def preprocess(line):
    defs = [0.] * n_inputs + [tf.constant([], dtype=tf.float32)]
    fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(fields[:-1])
    y = tf.stack(fields[-1:])
    return (x - X_mean) / X_std, y

ValueError: not enough values to unpack (expected 2, got 1)

In [None]:
Let’s walk through this code:

First, the code assumes that we have precomputed the mean and standard deviation of each feature in the training set. X_mean and
X_std are just 1D tensors (or NumPy arrays) containing eight floats, one per input feature.

The preprocess() function takes one CSV line and starts by parsing it. 

For this it uses the tf.io.decode_csv() function, which takes two arguments: the first is the line to parse, and the second is an array containing the default value for each column in the CSV file. 

This array tells TensorFlow not only the default value for each column, but also the number of columns and their types.

In this example, we tell it that all feature columns are floats and that missing values should default to 0, but we provide an empty array of type tf.float32 as the default value for the last column (the target): the array tells TensorFlow that this column contains floats, but that there is no default value, so it will raise an exception if it encounters a missing value.

The decode_csv() function returns a list of scalar tensors (one per column), but we need to return 1D tensor arrays. 

So we call tf.stack() on all tensors except for the last one (the target): this will stack these tensors into a 1D array. 

We then do the same for the target value (this makes it a 1D tensor array with a single value, rather than a scalar tensor).

Finally, we scale the input features by subtracting the feature means and then dividing by the feature standard deviations, and we return a tuple containing the scaled features and the target.

Let’s test this preprocessing function:

In [None]:
preprocess(b'4.2083,44.0,5.3232,0.9171,846.0,2.3370,37.47,-122.2,2.782')

(<tf.Tensor: id=6227, shape=(8,), dtype=float32, numpy=
array([ 0.16579159, 1.216324 , -0.05204564, -0.39215982, -0.5277444 ,
-0.2633488 , 0.8543046 , -1.3072058 ], dtype=float32)>,
<tf.Tensor: [...], numpy=array([2.782], dtype=float32)>)

Looks good! We can now apply the function to the dataset.

## Putting Everything Together

To make the code reusable, let’s put together everything we have discussed so far into a small helper function: it will create and return a dataset that will efficiently load California housing data from multiple CSV files, preprocess it, shuffle it, optionally repeat it, and batch it (see Figure 13-2):

In [6]:
def csv_reader_dataset(filepaths, repeat=1, n_readers=5,
                       n_read_threads=None, shuffle_buffer_size=10000,
                       n_parse_threads=5, batch_size=32):
    dataset = tf.data.Dataset.list_files(filepaths)
    dataset = dataset.interleave(
        lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
        cycle_length=n_readers, num_parallel_calls=n_read_threads)
    dataset = dataset.map(preprocess, num_parallel_calls=n_parse_threads)
    dataset = dataset.shuffle(shuffle_buffer_size).repeat(repeat)
    return dataset.batch(batch_size).prefetch(1)

Everything should make sense in this code, except the very last line (prefetch(1)), which is important for performance.