# How to Use Data Pipelines with Python

An important steps of your workflow is data ingestion. Before building the model, the data have to be the correct format. To do this, several steps called data pipeline perform. In this chapter, I am going to show how to handle file structure.

## What is Data Pipeline?

To analyze text data, you need to organize the directory structure correctly. For example, if you want to make text classification, you have to organize your training texts into positives and negatives. When you'll classify dataset as positive and negative your directory structure can be as follows:
pos
    p1.txt
    p2.txt
neg 
    n1.txt
    n2.txt
Let's thing of the Internet Movie Database (IMDB) dataset and classify movie reviews as positive and negative.

### Downloading Text Data

First of all, let me import libraries.

In [1]:
import io
import os
import re
import shutil
import string
import tensorflow as tf

Now that, I am going to load IMDb dataset. If you want to download directly you can use get_file() method. To do this, let me create url variable. 

In [2]:
url = "https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz"

and then let me use get_file() method as follows : 

In [3]:
ds = tf.keras.utils.get_file("aclImdb_v1.tar.gz", 
                             url,untar=True, 
                             cache_dir='.',
                             cache_subdir='')

So dataset downloaded and a directory is created called aclImdb in the current directory. I am going to create a variable, which represent this file path.

In [4]:
data_dir = os.path.join(os.path.dirname(ds), 'aclImdb')

Let's take a look inside of directory.

In [5]:
train_dir = os.path.join(data_dir, 'train')
os.listdir(train_dir)

['labeledBow.feat',
 'neg',
 'pos',
 'unsup',
 'unsupBow.feat',
 'urls_neg.txt',
 'urls_pos.txt',
 'urls_unsup.txt']

There is one directory called unsup. I don't need unsup directory. I want to remove this directory.

In [6]:
unused_dir = os.path.join(train_dir, 'unsup')
shutil.rmtree(unused_dir)

Let me take a look train_dir.

In [7]:
os.listdir(train_dir)

['labeledBow.feat',
 'neg',
 'pos',
 'unsupBow.feat',
 'urls_neg.txt',
 'urls_pos.txt',
 'urls_unsup.txt']

As you can see, unsup is removed from directory of data. Make sure you have only directory names are used as labels. 
Let me see the content in the training directory.

In [8]:
ls aclImdb\train

 Volume in drive C is Windows
 Volume Serial Number is A065-837B

 Directory of C:\TiAk\My-Notebooks\TensorFlow 2 Pocket Reference\Chapter-4\aclImdb\train

26.08.2021  18:16    <DIR>          .
26.08.2021  18:16    <DIR>          ..
12.04.2011  20:17        21.021.197 labeledBow.feat
12.04.2011  12:47    <DIR>          neg
12.04.2011  12:47    <DIR>          pos
12.04.2011  20:22        41.348.699 unsupBow.feat
12.04.2011  12:48           612.500 urls_neg.txt
12.04.2011  12:48           612.500 urls_pos.txt
12.04.2011  12:47         2.450.000 urls_unsup.txt
               5 File(s)     66.044.896 bytes
               4 Dir(s)  119.645.503.488 bytes free


As you can see there are pos and neg directories.

### Creating the Data Pipeline

Now that I am going to create pipeline. First, let me create a few variables:

In [9]:
batch_size = 1024
seed = 123

I am going to want to generates a tf.data.Dataset from text files in a directory. 

In [10]:
train_ds = tf.keras.preprocessing.text_dataset_from_directory(
    'aclImdb/train', 
    batch_size=batch_size, 
    validation_split=0.2,
    subset='training', 
    seed=seed)

Found 25000 files belonging to 2 classes.
Using 20000 files for training.


So I created train datasets. To fine tune hyperparameter, I'll use validation dataset. Let's create validation dataset.

In [11]:
val_ds = tf.keras.preprocessing.text_dataset_from_directory(
    'aclImdb/train', 
    batch_size=batch_size, 
    validation_split=0.2,
    subset='validation', 
    seed=seed)

Found 25000 files belonging to 2 classes.
Using 5000 files for validation.


### Inspecting the Dataset

I am going to inspect at the content of these files. Let's select randomly five rows in first batch and print out them.

In [12]:
import random
idx = random.sample(range(1, batch_size), 5)
for text_batch, label_batch in train_ds.take(1):
    for i in idx:
        print(label_batch[i].numpy(), text_batch.numpy()[i])

1 b'Clint Eastwood has definitely produced better movies than this, but this one does not embarrass him. Dirty Harry catches everyone\'s attention and unless one wants to watch romance, there is no reason why you won\'t like him. He is cool because he is dirty, is great because he kills without much thinking, is perfect because he gets the bullet right through your heart and a hero because he doesn\'t care.<br /><br />From what I have seen in movies in which Eastwood acts, the character of the lead role always captivates the audience. In White Hunter Black heart, he is the crazy director, in "in the Line of Fire" he is the "Old \'un" while here is the "almost" jobless with his job, that is to say he makes work for himself, doesn\'t care one damn about his superiors who practically send him out for a vacation.<br /><br />Based on a rape victim, this movie is promising for all the "no non-sense" movie watchers. The movie has nothing that goes away from he central plot. However, what make

In this section, I showed how to deal with text datasets. 

## The Data Pipeline for Image Datasets

In this section, I am gonig to show how to deal with data ingestion pipeline for image data. 

Sometimes, you have images in the same file. This file includes two columns : one with all the filenames and one with the labels. 

### Downloading Images 

First, I am going to download [flower dataset](https://data.mendeley.com/public-files/datasets/jxmfrvhpyz/files/283004ff-e529-4c3c-a1ee-4fb90024dc94/file_downloaded).

I am going to build a data pipeline to feed these images into an image classification model for training. Let me stream these images into the training process with ImageDataGenerator.

### Creating the Data Pipeline

Let me import libraries, which will use in this section.

In [None]:
import tensorflow as tf
import tensorflow_hub as hub
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

Note that dataset has a label file. I am going to see its contents using pandas library.

In [None]:
traindf=pd.read_csv('flower_photos/all_labels.csv',dtype=str)
traindf.head()

### Preprocessing the Dataset

Now that I am goning to create some hyperparameters, which will be used later.

In [None]:
data_root = 'flower_photos/flowers'
IMAGE_SIZE = (224, 224)
TRAINING_DATA_DIR = str(data_root)
BATCH_SIZE = 32

I am going to normalize dataset and reserve 20% of the images for validation dataset. Let me use a dictionary structure.

In [None]:
datagen_kwargs = dict(
    rescale=1./255, 
    validation_split=.20)

To build the model, I am going to use the prebuilted ResNet model. The ResNet model expects images to have pixel dimensions of 224\*224 and I need to determine the batch size and resample algorithm as well.

In [None]:
dataflow_kwargs = dict(
    target_size=IMAGE_SIZE,
    batch_size=BATCH_SIZE,
    interpolation="bilinear")

To train the images, I am going to define generator.

In [None]:
train_datagen = tf.keras.preprocessing.image.ImageDataGenerator(
    **datagen_kwargs)

Let me create a data flow pipeline.

In [None]:
train_generator=train_datagen.flow_from_dataframe(
    dataframe=traindf,
    directory=data_root,
    x_col="file_name",
    y_col="label",
    subset="training",
    seed=10,
    shuffle=True,
    class_mode="categorical",
    **dataflow_kwargs)

### Inspecting the Dataset

Let me show images in dataset.

In [None]:
image_batch, label_batch = next(iter(train_generator))
fig, axes = plt.subplots(8, 4, figsize=(20, 40))
axes = axes.flatten()
for img, lbl, ax in zip(image_batch, label_batch, axes):
    ax.imshow(img)
    label_ = np.argmax(lbl)
    label = idx_labels[label_]
    ax.set_title(label)
    ax.axis('off')
plt.show()

So data ingestion pipeline is ready to use. Let's train the model.

In [None]:
mdl = tf.keras.Sequential([
    tf.keras.layers.InputLayer( input_shape=IMAGE_SIZE + (3,)), 
    hub.KerasLayer( "https://tfhub.dev/tensorflow/resnet_50/feature_vector/1", 
                   trainable=False),
    tf.keras.layers.Dense(5, 
                          activation='softmax', 
                          name = 'custom_class')])
mdl.build([None, 224, 224, 3])

Let's compile the model.

In [None]:
mdl.compile(
  optimizer=tf.keras.optimizers.SGD(lr=0.005, momentum=0.9),
  loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True, label_smoothing=0.1),
  metrics=['accuracy'])

I am going to train the model. 

In [None]:
steps_per_epoch = train_generator.samples // train_generator.batch_size
validation_steps = valid_generator.samples // valid_generator.batch_size

mdl.fit(
    train_generator,
    epochs=13, steps_per_epoch=steps_per_epoch,
    validation_data=valid_generator,
    validation_steps=validation_steps)

That's it. As you can see, the training image generator and validation image generator are passed into training process. 

In this tutorial, I showed how to use the data ingestion pipeline. 

## Data Pipeline for NumPy Array Datasets

In this tutorial, I am going to show how to create a data pipeline using a NumPy array. To do this, I am going to use from_tensor_slices method. 

Let's use Fashion MNIST dataset, which consists of 10 types of garments in grayscale. The images are represented using a NumPy structure instead of a typical image format, such as JPEG or PNG. 

You can easily download using tf.Keras API.

### Loading the Dataset

First of all, let me import libraries.

In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

Let's load datasets using the load_data function in the tf.keras API.

In [None]:
fashion_mnist = tf.keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

Let me take a look the structure of datasets.

In [None]:
print(type(train_images), type(train_labels))

As you can see, the structures of datasets are NumPy arrays. Now that  am going to look at the shapes of datasets using shape command.

In [None]:
print(train_images.shape, train_labels.shape)

### Inspecting the NumPy Array

To visualize a NumPy array as a color scale, I am going to use matplotlib library. 

In [None]:
plt.figure()
plt.imshow(train_images[5])
plt.colorbar()
plt.grid(False)
plt.show()

### Preprocessing the Datasets

The images consist of pixel values between 0 and 255. To built faster the model and to get better accuracy, I am going to normalize the pixel values.

In [None]:
train_images = train_images/255

I am going to build a streaming pipeline using from_tensor_slices method. 

In [None]:
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels))

Let me split this dataset into training and validation sets.The hyperparameters are fine tuned with the validation dataset and the model is built with train dataset. 

In [None]:
SHUFFLE_BUFFER_SIZE = 10000
TRAIN_BATCH_SIZE = 50
VALIDATION_BATCH_SIZE = 10000

# To shuffle train dataset
validation_ds = train_dataset.shuffle(
    SHUFFLE_BUFFER_SIZE).take(
    VALIDATION_SAMPLE_SIZE).batch(VALIDATION_BATCH_SIZE)
train_ds = train_dataset.skip(
    VALIDATION_BATCH_SIZE).batch(
    TRAIN_BATCH_SIZE).repeat()

### Building the Model

The datasets is ready to build the model. To train the model, I am going to use Sequential model.

In [None]:
model = tf.keras.Sequential([
    tf.keras.layers.Flatten(input_shape=(28, 28)),
    tf.keras.layers.Dense(30, activation='relu'),
    tf.keras.layers.Dense(10)
    
# Compiling the model
model.compile(optimizer=tf.keras.optimizers.RMSprop(),
  loss=tf.keras.losses.SparseCategoricalCrossentropy(
  from_logits=True),
  metrics=['sparse_categorical_accuracy'])
    
#Trainging the model
model.fit(
    train_ds,
    epochs=13, steps_per_epoch=steps_per_epoch,
    validation_data=validation_ds,
    validation_steps=validation_steps)

That's it. So train_ds and validation_ds were passed into training process. In this section, I showed how to create a data pipeline using from_tensor_slices for dataset which consists of NumPy array.