# Efficient Data Pipelines with tf.data
- In earlier chapters, we focused on building, training, and evaluating computer vision models, understanding architectures, etc.
- In many production systems the bottleneck is not the model — it is the data pipeline.
- If data cannot be delivered to the model efficiently, GPUs sit idle, training slows, costs increase.

We'll:
- Explain why input pipelines can become ML bottlenecks.
- Build efficient data pipelines using tf.data.
- Use map() with parallel calls, cache(), shuffle(), batch(), prefetch()
- Measure and compare pipeline performance.
- Think like an ML engineer optimizing system throughput — not just model accuracy.

Each step serves a purpose:
- map() → transforms data (resize, normalize)
- cache() → avoids recomputation
- shuffle() → improves generalization
- batch() → enables parallel computation
- prefetch() → overlaps preprocessing and training

When combined properly, these dramatically improve throughput.

This notebook lays the foundation for:
- 07b – TensorFlow performance best practices
- 07c – Model export and versioning
- 07d – Distributed training concepts


In [8]:
import tensorflow as tf
import tensorflow_datasets as tfds
import time
import matplotlib.pyplot as plt

print("TensorFlow version:", tf.__version__)

# Load Dataset

IMG_SIZE = 128
BATCH_SIZE = 32

(ds_train, ds_val), ds_info = tfds.load(
    "tf_flowers",
    split=["train[:80%]", "train[80%:]"],
    as_supervised=True,
    with_info=True
)

NUM_CLASSES = ds_info.features["label"].num_classes

# Preprocessing Function

def preprocess(image, label):
    image = tf.image.resize(image, (IMG_SIZE, IMG_SIZE))
    image = image / 255.0
    return image, label


TensorFlow version: 2.9.1


In [9]:
# Baseline Pipeline (Slow)

ds_train_slow = (
    ds_train
    .map(preprocess)
    .batch(BATCH_SIZE)
)


In [10]:
# Optimized Pipeline

ds_train_fast = (
    ds_train
    .map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
    .cache()
    .shuffle(1000)
    .batch(BATCH_SIZE)
    .prefetch(tf.data.AUTOTUNE)
)


In [14]:
# Timing Comparison

def benchmark(dataset, epochs=3):
    start = time.time()
    for _ in range(epochs):
        for _ in dataset:
            pass
    return time.time() - start

slow_time = benchmark(ds_train_slow)
fast_time = benchmark(ds_train_fast)

print(f"Slow pipeline time: {slow_time:.2f} sec")
print(f"Fast pipeline time: {fast_time:.2f} sec")


Slow pipeline time: 0.26 sec
Fast pipeline time: 0.17 sec


In [15]:
# Simple Model

model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(32, 3, activation="relu", input_shape=(IMG_SIZE, IMG_SIZE, 3)),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(64, 3, activation="relu"),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation="relu"),
    tf.keras.layers.Dense(NUM_CLASSES, activation="softmax")
])

model.compile(
    optimizer="adam",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"]
)


In [16]:
# Train

history = model.fit(
    ds_train_fast,
    validation_data=ds_train_fast.take(10),
    epochs=3
)


Epoch 1/3
Epoch 2/3
Epoch 3/3
