# CH 13 Loading and Preprocessing Data with TensorFlow


In [11]:
import tensorflow as tf

### The tf.data API


In [3]:
help(tf.data)

Help on package tensorflow._api.v2.data in tensorflow._api.v2:

NAME
    tensorflow._api.v2.data - `tf.data.Dataset` API for input pipelines.

DESCRIPTION
    See [Importing Data](https://tensorflow.org/guide/data) for an overview.

PACKAGE CONTENTS
    experimental (package)

DATA
    AUTOTUNE = -1
    INFINITE_CARDINALITY = -1
    UNKNOWN_CARDINALITY = -2

FILE
    c:\users\sayed\.conda\envs\homl3\lib\site-packages\tensorflow\_api\v2\data\__init__.py




In [4]:
tf.range(10)

<tf.Tensor: shape=(10,), dtype=int32, numpy=array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])>

In [6]:
dataset = tf.data.Dataset.from_tensor_slices(tf.range(10))
dataset

<_TensorSliceDataset element_spec=TensorSpec(shape=(), dtype=tf.int32, name=None)>

In [9]:
try :
    dataset[0]
except Exception as ex:
    print(ex)

'_TensorSliceDataset' object is not subscriptable


In [10]:
len(dataset)

10

In [15]:
it = iter(dataset)
print(next(it))
print(next(it))
print(next(it))

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)


In [16]:
for item in dataset:
    print(item)

tf.Tensor(0, shape=(), dtype=int32)
tf.Tensor(1, shape=(), dtype=int32)
tf.Tensor(2, shape=(), dtype=int32)
tf.Tensor(3, shape=(), dtype=int32)
tf.Tensor(4, shape=(), dtype=int32)
tf.Tensor(5, shape=(), dtype=int32)
tf.Tensor(6, shape=(), dtype=int32)
tf.Tensor(7, shape=(), dtype=int32)
tf.Tensor(8, shape=(), dtype=int32)
tf.Tensor(9, shape=(), dtype=int32)


In [17]:
X_nested = {"a": ([1, 2, 3], [4, 5, 6]), "b": [7, 8, 9]}
dataset = tf.data.Dataset.from_tensor_slices(X_nested)

for item in dataset:
    print(item)

{'a': (<tf.Tensor: shape=(), dtype=int32, numpy=1>, <tf.Tensor: shape=(), dtype=int32, numpy=4>), 'b': <tf.Tensor: shape=(), dtype=int32, numpy=7>}
{'a': (<tf.Tensor: shape=(), dtype=int32, numpy=2>, <tf.Tensor: shape=(), dtype=int32, numpy=5>), 'b': <tf.Tensor: shape=(), dtype=int32, numpy=8>}
{'a': (<tf.Tensor: shape=(), dtype=int32, numpy=3>, <tf.Tensor: shape=(), dtype=int32, numpy=6>), 'b': <tf.Tensor: shape=(), dtype=int32, numpy=9>}


In [25]:
# let's try using unbalanced dict
try :
    X_nested = {"a": ([1, 2, 3], [4, 5, 6]), "b": [7, 8, 9,10]}
    dataset = tf.data.Dataset.from_tensor_slices(X_nested)

    for item in dataset:
        print(item)
except Exception as ex:
    print('ERROR: ',ex)

ERROR:  Dimensions 3 and 4 are not compatible


### Chaining Transformation


In [34]:
dataset = tf.data.Dataset.from_tensor_slices(tf.range(10))
dataset = dataset.repeat(3)
print(list(dataset.as_numpy_iterator()))

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


In [38]:
list(dataset.batch(batch_size=8).as_numpy_iterator())

[array([0, 1, 2, 3, 4, 5, 6, 7]),
 array([8, 9, 0, 1, 2, 3, 4, 5]),
 array([6, 7, 8, 9, 0, 1, 2, 3]),
 array([4, 5, 6, 7, 8, 9])]

In [79]:
dataset = tf.data.Dataset.from_tensor_slices(tf.range(10)).repeat(200)
print(list(dataset.as_numpy_iterator())[:20])

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


In [80]:
len(dataset)

2000

In [81]:
batched_data = dataset.batch(batch_size=8,drop_remainder=True)

num_parallel_calls
(Optional.) A tf.int64 scalar tf.Tensor, representing the number of batches to compute asynchronously in parallel. If not specified, batches will be computed sequentially. If the value tf.data.AUTOTUNE is used, then the number of parallel calls is set dynamically based on available resources.


In [82]:
parallel_batched_data = dataset.batch(batch_size=8,drop_remainder=True,
                            num_parallel_calls=tf.data.AUTOTUNE)

In [85]:
list(parallel_batched_data.as_numpy_iterator())[:4]

[array([0, 1, 2, 3, 4, 5, 6, 7]),
 array([8, 9, 0, 1, 2, 3, 4, 5]),
 array([6, 7, 8, 9, 0, 1, 2, 3]),
 array([4, 5, 6, 7, 8, 9, 0, 1])]

#### map


In [93]:
mapped_data = batched_data.map(lambda x : x*3)
list(mapped_data.as_numpy_iterator())[:4]

[array([ 0,  3,  6,  9, 12, 15, 18, 21]),
 array([24, 27,  0,  3,  6,  9, 12, 15]),
 array([18, 21, 24, 27,  0,  3,  6,  9]),
 array([12, 15, 18, 21, 24, 27,  0,  3])]

In [95]:
def exp_func(x):
    return tf.exp(float(x))

mapped_data = batched_data.map(exp_func)
list(mapped_data.as_numpy_iterator())[:4]

[array([1.0000000e+00, 2.7182817e+00, 7.3890562e+00, 2.0085537e+01,
        5.4598148e+01, 1.4841316e+02, 4.0342877e+02, 1.0966332e+03],
       dtype=float32),
 array([2.9809580e+03, 8.1030835e+03, 1.0000000e+00, 2.7182817e+00,
        7.3890562e+00, 2.0085537e+01, 5.4598148e+01, 1.4841316e+02],
       dtype=float32),
 array([4.0342877e+02, 1.0966332e+03, 2.9809580e+03, 8.1030835e+03,
        1.0000000e+00, 2.7182817e+00, 7.3890562e+00, 2.0085537e+01],
       dtype=float32),
 array([5.4598148e+01, 1.4841316e+02, 4.0342877e+02, 1.0966332e+03,
        2.9809580e+03, 8.1030835e+03, 1.0000000e+00, 2.7182817e+00],
       dtype=float32)]

#### filter


In [122]:
filtered_data = dataset.filter(lambda x : x>5)
list(filtered_data.as_numpy_iterator())[:5]

[6, 7, 8, 9, 6]

In [124]:
try :
    filtered_data = batched_data.filter(lambda x : x>5)
    list(filtered_data.as_numpy_iterator())[:5]
except Exception as ex:
    print("ERROR",ex)


Invalid `predicate`. `predicate` must return a `tf.bool` scalar tensor, but its return type is TensorSpec(shape=(8,), dtype=tf.bool, name=None).


In [134]:
filtered_data = batched_data.filter(lambda x : tf.reduce_min(x)==0)
list(filtered_data.as_numpy_iterator())[:5]

[array([0, 1, 2, 3, 4, 5, 6, 7]),
 array([8, 9, 0, 1, 2, 3, 4, 5]),
 array([6, 7, 8, 9, 0, 1, 2, 3]),
 array([4, 5, 6, 7, 8, 9, 0, 1]),
 array([0, 1, 2, 3, 4, 5, 6, 7])]

In [140]:
list(filtered_data.take(3).as_numpy_iterator())

[array([0, 1, 2, 3, 4, 5, 6, 7]),
 array([8, 9, 0, 1, 2, 3, 4, 5]),
 array([6, 7, 8, 9, 0, 1, 2, 3])]

#### Shuffle


buffer_size
A tf.int64 scalar tf.Tensor, representing the number of elements from this dataset from which the new dataset will sample. To uniformly shuffle the entire dataset, use buffer_size=dataset.cardinality().

Randomly shuffles the elements of this dataset.

This dataset fills a buffer with buffer_size elements, then randomly samples elements from this buffer, replacing the selected elements with new elements. For perfect shuffling, a buffer size greater than or equal to the full size of the dataset is required.

For instance, if your dataset contains 10,000 elements but buffer_size is set to 1,000, then shuffle will initially select a random element from only the first 1,000 elements in the buffer. Once an element is selected, its space in the buffer is replaced by the next (i.e. 1,001-st) element, maintaining the 1,000 element buffer.


In [157]:
dataset = tf.data.Dataset.range(10).repeat(2).shuffle(buffer_size=4, seed=42, reshuffle_each_iteration=False).batch(7)
list(dataset.as_numpy_iterator())

[array([0, 1, 3, 4, 5, 2, 6], dtype=int64),
 array([0, 9, 1, 7, 2, 3, 8], dtype=int64),
 array([5, 8, 7, 9, 4, 6], dtype=int64)]

### Interleaving Lines from Multiple Files


In [1]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split

housing = fetch_california_housing()
X_train_full, X_test, y_train_full, y_test = train_test_split(
    housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(
    X_train_full, y_train_full, random_state=42)

In [7]:
import numpy as np
from pathlib import Path

def save_to_csv_files(data, name_prefix, header=None, n_parts=10):
    housing_dir = Path()/'datasets'/'housing'
    housing_dir.mkdir(parents=True, exist_ok=True)
    filename_format = "my_{}_{:02d}.csv"
    

    filepaths = []
    m = len(data)
    chunks = np.array_split(np.arange(m), n_parts)
    for file_idx, row_indices in enumerate(chunks):
        part_csv = housing_dir/ filename_format.format(name_prefix, file_idx)
        filepaths.append(str(part_csv))
        with open(part_csv,'w') as f:
            if header is not None:
                f.write(header)
                f.write('\n')
            for row_idx in row_indices:
                f.write(','.join([repr(col) for col in data[row_idx]]))
                f.write('\n')
    return filepaths

In [8]:
train_data = np.c_[X_train,y_train]
valid_data = np.c_[X_valid, y_valid]
test_data = np.c_[X_test,y_test]

header_cols = housing.feature_names + ['MedianHouseValue']
header = ','.join(header_cols)

train_filepaths = save_to_csv_files(train_data, "train", header, n_parts=20)
valid_filepaths = save_to_csv_files(valid_data, "valid", header, n_parts=10)
test_filepaths = save_to_csv_files(test_data, "test", header, n_parts=10)

In [9]:
train_filepaths

['datasets\\housing\\my_train_00.csv',
 'datasets\\housing\\my_train_01.csv',
 'datasets\\housing\\my_train_02.csv',
 'datasets\\housing\\my_train_03.csv',
 'datasets\\housing\\my_train_04.csv',
 'datasets\\housing\\my_train_05.csv',
 'datasets\\housing\\my_train_06.csv',
 'datasets\\housing\\my_train_07.csv',
 'datasets\\housing\\my_train_08.csv',
 'datasets\\housing\\my_train_09.csv',
 'datasets\\housing\\my_train_10.csv',
 'datasets\\housing\\my_train_11.csv',
 'datasets\\housing\\my_train_12.csv',
 'datasets\\housing\\my_train_13.csv',
 'datasets\\housing\\my_train_14.csv',
 'datasets\\housing\\my_train_15.csv',
 'datasets\\housing\\my_train_16.csv',
 'datasets\\housing\\my_train_17.csv',
 'datasets\\housing\\my_train_18.csv',
 'datasets\\housing\\my_train_19.csv']

### Building an Input Pipeline


In [14]:
filepath_dataset = tf.data.Dataset.list_files(train_filepaths, seed=42)
list(filepath_dataset.as_numpy_iterator())[:3]

[b'datasets\\housing\\my_train_05.csv',
 b'datasets\\housing\\my_train_16.csv',
 b'datasets\\housing\\my_train_01.csv']

In [15]:
n_readers = 5
dataset = filepath_dataset.interleave(lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
                                    cycle_length=n_readers)

In [16]:
for line in dataset.take(5):    
    print(line)

tf.Tensor(b'4.5909,16.0,5.475877192982456,1.0964912280701755,1357.0,2.9758771929824563,33.63,-117.71,2.418', shape=(), dtype=string)
tf.Tensor(b'2.4792,24.0,3.4547038327526134,1.1341463414634145,2251.0,3.921602787456446,34.18,-118.38,2.0', shape=(), dtype=string)
tf.Tensor(b'4.2708,45.0,5.121387283236994,0.953757225433526,492.0,2.8439306358381504,37.48,-122.19,2.67', shape=(), dtype=string)
tf.Tensor(b'2.1856,41.0,3.7189873417721517,1.0658227848101265,803.0,2.0329113924050635,32.76,-117.12,1.205', shape=(), dtype=string)
tf.Tensor(b'4.1812,52.0,5.701388888888889,0.9965277777777778,692.0,2.4027777777777777,33.73,-118.31,3.215', shape=(), dtype=string)


### Preprocessing the Data


In [18]:
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
scaler.fit(X_train)

In [22]:
X_mean, X_std = scaler.mean_, scaler.scale_
n_inputs = len(X_train[0])

def parse_csv_line(line):
    defs = [0.]*n_inputs + [tf.constant([],dtype=tf.float32)]
    fields = tf.io.decode_csv(line, record_defaults=defs)
    return tf.stack(fields[:-1]),tf.stack(fields[-1:])

def preprocess(line):
    x,y = parse_csv_line(line)
    return (x-X_mean)/X_std, y

In [23]:
preprocess(b'4.2083,44.0,5.3232,0.9171,846.0,2.3370,37.47,-122.2,2.782')


(<tf.Tensor: shape=(8,), dtype=float32, numpy=
 array([ 0.16579159,  1.216324  , -0.05204564, -0.39215982, -0.5277444 ,
        -0.2633488 ,  0.8543046 , -1.3072058 ], dtype=float32)>,
 <tf.Tensor: shape=(1,), dtype=float32, numpy=array([2.782], dtype=float32)>)

### Putting Everything Together + Prefetching


In [24]:
def csv_reader_dataset(filepaths, n_readers=5, n_read_threads=None,
                    n_parse_threads=5, shuffle_buffer_size=10_000, seed=42,
                    batch_size=32):
    dataset = tf.data.Dataset.list_files(filepaths, seed=seed)
    dataset = dataset.interleave(
        lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
        cycle_length=n_readers, num_parallel_calls=n_read_threads)
    dataset = dataset.map(preprocess, num_parallel_calls=n_parse_threads)
    dataset = dataset.shuffle(shuffle_buffer_size, seed=seed)
    return dataset.batch(batch_size).prefetch(1)

In [25]:
example_set = csv_reader_dataset(train_filepaths, batch_size=3)
for X_batch, y_batch in example_set.take(2):
    print("X =", X_batch)
    print("y =", y_batch)
    print()

X = tf.Tensor(
[[-1.3957452  -0.04940685 -0.22830808  0.22648273  2.2593622   0.35200632
   0.9667386  -1.4121602 ]
 [ 2.7112627  -1.0778131   0.69413143 -0.14870553  0.51810503  0.3507294
  -0.82285154  0.80680597]
 [-0.13484643 -1.868895    0.01032507 -0.13787179 -0.12893449  0.03143518
   0.2687057   0.13212144]], shape=(3, 8), dtype=float32)
y = tf.Tensor(
[[1.819]
 [3.674]
 [0.954]], shape=(3, 1), dtype=float32)

X = tf.Tensor(
[[ 0.09031774  0.9789995   0.1327582  -0.13753782 -0.23388447  0.10211545
   0.97610843 -1.4121602 ]
 [ 0.05218809 -2.0271113   0.2940109  -0.02403445  0.16218767 -0.02844518
   1.4117942  -0.93737936]
 [-0.672276    0.02970133 -0.76922584 -0.15086786  0.4962024  -0.02741998
  -0.7853724   0.77182245]], shape=(3, 8), dtype=float32)
y = tf.Tensor(
[[2.725]
 [1.205]
 [1.625]], shape=(3, 1), dtype=float32)



In [26]:
train_set = csv_reader_dataset(train_filepaths)
valid_set = csv_reader_dataset(valid_filepaths)
test_set = csv_reader_dataset(test_filepaths)

In [27]:

# extra code – for reproducibility
tf.keras.backend.clear_session()
tf.random.set_seed(42)

In [28]:
model = tf.keras.Sequential([
    tf.keras.layers.Dense(30, activation="relu", kernel_initializer="he_normal",
                          input_shape=X_train.shape[1:]),
    tf.keras.layers.Dense(1),
])
model.compile(loss="mse", optimizer="sgd")
model.fit(train_set, validation_data=valid_set, epochs=5)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.src.callbacks.History at 0x14f7519ee00>

In [29]:
# extra code – defines the optimizer and loss function for training
optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
loss_fn = tf.keras.losses.mean_squared_error

n_epochs = 5
for epoch in range(n_epochs):
    for X_batch, y_batch in train_set:
        # extra code – perform one Gradient Descent step
        #              as explained in Chapter 12
        print("\rEpoch {}/{}".format(epoch + 1, n_epochs), end="")
        with tf.GradientTape() as tape:
            y_pred = model(X_batch)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss] + model.losses)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))

Epoch 5/5

In [30]:
@tf.function
def train_one_epoch(model, optimizer, loss_fn, train_set):
    for X_batch, y_batch in train_set:
        with tf.GradientTape() as tape:
            y_pred = model(X_batch)
            main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
            loss = tf.add_n([main_loss] + model.losses)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))

optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
loss_fn = tf.keras.losses.mean_squared_error
for epoch in range(n_epochs):
    print("\rEpoch {}/{}".format(epoch + 1, n_epochs), end="")
    train_one_epoch(model, optimizer, loss_fn, train_set)

Epoch 5/5

### The TFRecord Format
