# Tensorflow Data API

- Data API: criamos um objeto do tipo dataset e dizemos para ele onde obter os dados e como transformá-los.
- O TF toma conta de toda a implementação.
- O Data API trabalha perfeitamente bem com o tf.keras.
- Vamos trabalhar com Data API, TFRecorder e como criar camadas de pré-processamento customizadas.
- TF Transforms (tf.transform): Torna possível escrever uma única função de pré-processamento que pode ser executada em batch sobre todo o conjunto de treino.
- TF Dataset (TFDS): Fonte de datasets de onde podemos baixar alguns datasets mais comuns.

In [1]:
import keras.backend
import numpy as np
from pprint import pprint
import tensorflow as tf

In [2]:
X = tf.range(10) # dados de treino qualquer
dataset = tf.data.Dataset.from_tensor_slices(X) # criamos um dataset a partir do X

In [3]:
for item in dataset:
    pprint(item)

<tf.Tensor: shape=(), dtype=int32, numpy=0>
<tf.Tensor: shape=(), dtype=int32, numpy=1>
<tf.Tensor: shape=(), dtype=int32, numpy=2>
<tf.Tensor: shape=(), dtype=int32, numpy=3>
<tf.Tensor: shape=(), dtype=int32, numpy=4>
<tf.Tensor: shape=(), dtype=int32, numpy=5>
<tf.Tensor: shape=(), dtype=int32, numpy=6>
<tf.Tensor: shape=(), dtype=int32, numpy=7>
<tf.Tensor: shape=(), dtype=int32, numpy=8>
<tf.Tensor: shape=(), dtype=int32, numpy=9>


# Transformações encadeadas

Visto que tenhamos um dataset, podemos aplicar qualquer tipo de transformação sobre ele.

Obs.: cada método retorna um novo dataset.


In [4]:
# no código abaixo, repetimos o dataset 3 vezes (os dados são copiados para memória 3x) e então criarmos um batch de tamanho 7
dataset = dataset.repeat(3).batch(7)

![chaining_transofrms](https://miro.medium.com/max/720/1*pQPYe49MSBau47N8fRaZ1w.png)

In [5]:
for item in dataset:
    print(item)

tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int32)
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int32)
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int32)
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int32)
tf.Tensor([8 9], shape=(2,), dtype=int32)


In [6]:
# Podemos ainda realizar transformações sobre o dataset usando o método map()
dataset = dataset.map(lambda x: x * 2)
for item in dataset:
    print(item)

tf.Tensor([ 0  2  4  6  8 10 12], shape=(7,), dtype=int32)
tf.Tensor([14 16 18  0  2  4  6], shape=(7,), dtype=int32)
tf.Tensor([ 8 10 12 14 16 18  0], shape=(7,), dtype=int32)
tf.Tensor([ 2  4  6  8 10 12 14], shape=(7,), dtype=int32)
tf.Tensor([16 18], shape=(2,), dtype=int32)


In [7]:
# Se quisermos aplicar uma transformação sobre o dataset todo, usamos o apply()
dataset = dataset.apply(tf.data.experimental.unbatch())
for item in dataset:
    print(item)

Instructions for updating:
Use `tf.data.Dataset.unbatch()`.
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(10, shape=(), dtype=int32)
tf.Tensor(12, shape=(), dtype=int32)
tf.Tensor(14, shape=(), dtype=int32)
tf.Tensor(16, shape=(), dtype=int32)
tf.Tensor(18, shape=(), dtype=int32)
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(10, shape=(), dtype=int32)
tf.Tensor(12, shape=(), dtype=int32)
tf.Tensor(14, shape=(), dtype=int32)
tf.Tensor(16, shape=(), dtype=int32)
tf.Tensor(18, shape=(), dtype=int32)
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(10, shape=(), dtype=

In [8]:
# Podemos ainda aplicar um filtro (mask) sobre o dataset usando o filter()
dataset = dataset.filter(lambda x: x< 10)

In [9]:
for item in dataset:
    print(item)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)


Sabemos que o algoritmos Gradient Descent trabalha melhor quando as instâncias de treino são independentes e identicamente distribuídas.

Uma boa maneira de garantirmos esta condição é realizar um shuffle das instâncias.


In [10]:
dataset_new = tf.data.Dataset.range(10).repeat(3)
dataset_new = dataset_new.shuffle(buffer_size=5, seed=42).batch(7)  # o valor para o buffer_size tem de ser escolhido com cuidado para não estourar a RAM

In [11]:
for item in dataset_new:
    print(item)

tf.Tensor([0 2 3 6 7 9 4], shape=(7,), dtype=int64)
tf.Tensor([5 0 1 1 8 6 5], shape=(7,), dtype=int64)
tf.Tensor([4 8 7 1 2 3 0], shape=(7,), dtype=int64)
tf.Tensor([5 4 2 7 8 9 9], shape=(7,), dtype=int64)
tf.Tensor([3 6], shape=(2,), dtype=int64)


# Pré-Processamento dos Dados

Vamos criar uma função para realizar o pré-processamento.

In [12]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import os
import numpy as np

In [13]:
housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42
)

In [14]:
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42
)

In [15]:
scaler = StandardScaler()
scaler.fit(X_train)
X_mean = scaler.mean_
X_std = scaler.scale_

print(f"X_mean {X_mean}\nX_std: {X_std}")

X_mean [ 3.89175860e+00  2.86245478e+01  5.45593655e+00  1.09963474e+00
  1.42428122e+03  2.95886657e+00  3.56464315e+01 -1.19584363e+02]
X_std: [1.90927329e+00 1.26409177e+01 2.55038070e+00 4.65460128e-01
 1.09576000e+03 2.36138048e+00 2.13456672e+00 2.00093304e+00]


In [16]:
# Precisamos realizar um reshape sobre o housing.target para ser q coluna e X linhas.
t = housing.target.reshape(-1, 1)
pprint(t)

array([[4.526],
       [3.585],
       [3.521],
       ...,
       [0.923],
       [0.847],
       [0.894]])


In [41]:
y_train.shape

(11610, 1)

In [44]:
# Quando o dataset é muito grande para caber na memória, o usual é divide-lo em uma
# série de arquivos menores.
def split_multiple_csv_files(data, name_prefix, header=None, n_parts=10):
    housing_dir = "../data_raw"
    path_format = os.path.join(housing_dir, "{}_{:02d}.csv")

    file_paths = []
    m = len(data)
    for file_idx, row_indices in enumerate(np.array_split(np.arange(m), n_parts)):
        part_csv = path_format.format(name_prefix, file_idx)
        file_paths.append(part_csv)
        with open(part_csv, "wt", encoding="utf-8") as f:
            if header is not None:
                f.write(header)
                f.write("\n")
            for row_idx in row_indices:
                f.write(",".join([repr(col) for col in data[row_idx]]))
                f.write("\n")
    return file_paths

In [45]:
train_data = np.c_[X_train, y_train]
valid_data = np.c_[X_valid, y_valid]
test_data = np.c_[X_test, y_test]
header_cols = housing.feature_names + ["MedianHousingValues"]
header = ",".join(header_cols)

In [46]:
train_filepaths = split_multiple_csv_files(train_data, "train", header=header, n_parts=20)
valid_filepaths = split_multiple_csv_files(valid_data, "valid", header=header, n_parts=20)
test_filepaths = split_multiple_csv_files(test_data, "test", header=header, n_parts=20)

In [48]:
# uma rápida checagem
import pandas as pd
pd.read_csv(train_filepaths[2]).head()

Unnamed: 0,MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,MedianHousingValues
0,1.6571,34.0,4.454976,1.087678,1358.0,3.218009,37.94,-122.35,1.052
1,3.6301,7.0,5.76669,1.070274,4399.0,3.091356,33.25,-117.32,1.709
2,6.2516,9.0,6.972376,0.972376,1139.0,3.146409,34.15,-117.22,1.593
3,2.0077,20.0,4.305195,1.099567,1452.0,3.142857,32.75,-117.08,1.183
4,2.3567,26.0,2.292829,1.119522,2660.0,2.649402,34.07,-118.29,2.531


In [49]:
# por padrão, o list_files() retorna um dataset que mistura os caminhos dos arquivos
filepath_dataset = tf.data.Dataset.list_files(train_filepaths, seed=42)

In [50]:
for filepath in filepath_dataset:
    pprint(filepath)

<tf.Tensor: shape=(), dtype=string, numpy=b'..\\data_raw\\train_15.csv'>
<tf.Tensor: shape=(), dtype=string, numpy=b'..\\data_raw\\train_08.csv'>
<tf.Tensor: shape=(), dtype=string, numpy=b'..\\data_raw\\train_03.csv'>
<tf.Tensor: shape=(), dtype=string, numpy=b'..\\data_raw\\train_01.csv'>
<tf.Tensor: shape=(), dtype=string, numpy=b'..\\data_raw\\train_10.csv'>
<tf.Tensor: shape=(), dtype=string, numpy=b'..\\data_raw\\train_05.csv'>
<tf.Tensor: shape=(), dtype=string, numpy=b'..\\data_raw\\train_19.csv'>
<tf.Tensor: shape=(), dtype=string, numpy=b'..\\data_raw\\train_16.csv'>
<tf.Tensor: shape=(), dtype=string, numpy=b'..\\data_raw\\train_02.csv'>
<tf.Tensor: shape=(), dtype=string, numpy=b'..\\data_raw\\train_09.csv'>
<tf.Tensor: shape=(), dtype=string, numpy=b'..\\data_raw\\train_00.csv'>
<tf.Tensor: shape=(), dtype=string, numpy=b'..\\data_raw\\train_07.csv'>
<tf.Tensor: shape=(), dtype=string, numpy=b'..\\data_raw\\train_12.csv'>
<tf.Tensor: shape=(), dtype=string, numpy=b'..\\dat

Usamos o método `interleave()` para ler um cinco linhas de um arquivo de uma só vez e intercalar suas linhas.

In [51]:
n_reads = 5
dataset = filepath_dataset.interleave(
    lambda _filepath: tf.data.TextLineDataset(_filepath).skip(1),
    cycle_length=n_reads
)

In [52]:
for line in dataset.take(5):
    pprint(line.numpy())

(b'4.6477,38.0,5.03728813559322,0.911864406779661,745.0,2.5254237288135593,32.6'
 b'4,-117.07,1.504')
(b'8.72,44.0,6.163179916317992,1.0460251046025104,668.0,2.794979079497908,34.2,'
 b'-118.18,4.159')
(b'3.8456,35.0,5.461346633416459,0.9576059850374065,1154.0,2.8778054862842892,3'
 b'7.96,-122.05,1.598')
(b'3.3456,37.0,4.514084507042254,0.9084507042253521,458.0,3.2253521126760565,36'
 b'.67,-121.7,2.526')
(b'3.6875,44.0,4.524475524475524,0.993006993006993,457.0,3.195804195804196,34.0'
 b'4,-118.15,1.625')


In [53]:
n_inputs = X_train.shape[1]

In [54]:
@tf.function
def preprocessing(line):
    defs = [0.] * n_inputs + [tf.constant([], dtype=tf.float32)]
    fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(fields[:-1]) # de 0 a 7 -- excluímos o 8
    y = tf.stack(fields[-1:]) # apenas o 8

    return (x - X_mean) / X_std, y

In [56]:
preprocessing(b'4.290, 44.0, 5.35, 0.91, 846.0, 2.33, 37.47, -122.2, 2.786')

(<tf.Tensor: shape=(8,), dtype=float32, numpy=
 array([ 0.20858265,  1.216324  , -0.04153753, -0.4074135 , -0.5277444 ,
        -0.26631314,  0.8543046 , -1.3072058 ], dtype=float32)>,
 <tf.Tensor: shape=(1,), dtype=float32, numpy=array([2.786], dtype=float32)>)

Vamos juntar tudo em uma única função seguindo o pipeline abaixo.

![pipeline](https://miro.medium.com/max/720/1*lO-22xUVBSwHvt3Zrd-CJw.png)

In [57]:
def read_dataset(filepaths, repeat=1,
                 n_readrs=5,
                 n_read_threads=None,
                 shuffle_buff_size=1000,
                 n_parse_threads=5,
                 batch_size=32):
    dataset = tf.data.Dataset.list_files(filepaths).repeat(repeat)
    dataset = dataset.interleave(lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
                                 cycle_length=n_readrs,
                                 num_parallel_calls=n_read_threads)
    dataset = dataset.shuffle(shuffle_buff_size)
    dataset = dataset.map(preprocessing, num_parallel_calls=n_parse_threads)
    dataset = dataset.batch(batch_size=batch_size)

    return dataset.prefetch(1)


Sobre o `prefetch()`: quando o chamamos ao final do, criamos um dataset que irá "se esforçar" para sempre estar um batch a frente.

![pipeline](https://miro.medium.com/max/720/1*r7jxg8IJ88MWrC-IA-q1eg.png)


Traduzindo: enquanto nosso algoritmo de treino está trabalhando em um batch, o dataset irá trabalhar em paralelo para ter o próximo batch já pronto.

In [58]:
tf.random.set_seed(42)

train_set = read_dataset(train_filepaths, batch_size=3)
for X_batch, y_batch in train_set.take(2):
    pprint(f"X = {X_batch}")
    pprint(f"y = {y_batch}\n")

('X = [[ 0.5804519  -0.20762321  0.05616303 -0.15191229  0.01343246  '
 '0.00604472\n'
 '   1.2525111  -1.3671792 ]\n'
 ' [-0.21650054 -0.91959685 -0.37069115 -0.15282531  1.9280853   0.6030511\n'
 '  -0.7338394   0.8018072 ]\n'
 ' [-0.37792316 -0.2867314  -0.44674355 -0.02454283  1.0081758  -0.3701026\n'
 '   0.79340184 -1.1822641 ]]')
'y = [[1.752]\n [1.522]\n [2.561]]\n'
('X = [[-1.0393791   0.02970133  0.0704432   0.01656396 -0.14901187  '
 '0.2554778\n'
 '   0.69033587 -0.2876847 ]\n'
 ' [-0.47738516  0.10880951 -0.23843908 -0.0527132   0.29999155  0.21409526\n'
 '  -0.67293835  0.5919061 ]\n'
 ' [ 0.6041258   0.8998913  -0.00953289  0.17155597 -0.63817006  0.05904346\n'
 '  -0.8181658   0.6118972 ]]')
'y = [[0.792]\n [1.915]\n [2.319]]\n'


Vamos agora para a última parte, em que juntamos com o `tf.keras`.

Usamos a função `read_dataset()` que acabamos de criar para gerar um conjunto de treino, um de validação e um de teste.

In [59]:
train_set = read_dataset(train_filepaths, repeat=None)
test_set = read_dataset(test_filepaths)
valid_set = read_dataset(valid_filepaths)

Agora podemos construir e treinar um modelo com o Keras usando estes datasets passando os de treino e validação para o `fit()`

In [60]:
# antes, vamos ver com que cara do dataset ficou
for item in train_set.take(1):
    pprint(item)

(<tf.Tensor: shape=(32, 8), dtype=float32, numpy=
array([[ 9.40889344e-02, -1.55246222e+00,  4.79670703e-01,
        -3.21188346e-02,  1.61275044e-01, -3.14145871e-02,
         1.40711010e+00, -6.92494690e-01],
       [-1.04775929e+00,  1.13721585e+00,  1.67430863e-02,
         2.59568132e-02, -2.85903156e-01,  2.89581902e-02,
         9.90162194e-01, -1.29721212e+00],
       [-2.78356493e-01, -1.39424586e+00,  2.80269951e-01,
        -1.49353482e-02, -7.76886582e-01, -6.93427771e-02,
         1.69756675e+00, -8.22435141e-01],
       [ 2.32650900e+00, -3.65839571e-01,  9.64194715e-01,
        -8.51519182e-02, -3.13281417e-01, -1.08566359e-01,
         9.43315029e-01, -1.17227042e+00],
       [-1.04168367e+00,  5.83458602e-01, -6.80176437e-01,
        -1.12405427e-01,  1.48820794e+00,  4.01368171e-01,
        -7.47894943e-01,  9.11756575e-01],
       [-9.01996970e-01,  5.83458602e-01, -5.05193770e-01,
         1.08205207e-01,  1.32941401e+00,  2.72037178e-01,
        -7.29155362e-01,  7

In [62]:
tf.keras.backend.clear_session()
np.random.seed(42)
tf.random.set_seed(42)

model = tf.keras.models.Sequential([
    tf.keras.layers.Dense(32, activation='relu', input_shape=X_train.shape[1:]),
    tf.keras.layers.Dense(1)
])

In [63]:
model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 32)                288       
                                                                 
 dense_1 (Dense)             (None, 1)                 33        
                                                                 
Total params: 321
Trainable params: 321
Non-trainable params: 0
_________________________________________________________________


In [65]:
# compile o modelo
model.compile(loss="mse", optimizer=tf.keras.optimizers.SGD(learning_rate=1e-3))

In [66]:
batch_size = 32
model.fit(train_set, steps_per_epoch=len(X_train) // batch_size,
          epochs=20,
          validation_data=valid_set)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<keras.callbacks.History at 0x1f37f042520>

In [67]:
# avaliação do modelo
model.evaluate(test_set, steps=len(X_test) // batch_size)



0.43188807368278503

In [69]:
# Agora podemos fazer predições
new_set = test_set.map(lambda X, y: X) # Queremos apenas os valores e não as labels. O keras teria ignorado mesmo assim.

In [72]:
X_new = model.predict(new_set, steps=len(X_test) // batch_size)



In [73]:
X_new

array([[1.7827599],
       [2.4168222],
       [2.1397085],
       ...,
       [3.1641123],
       [2.0432794],
       [1.8787067]], dtype=float32)