# Datasets

The preferred way to ingest data into TensorFlow estimators is by using the `tf.data.Dataset` class. There are a couple reasons for this:
1. Datasets automatically manage memory and resources
2. They separate data ingestion from modeling. This means that modeling steps can be ran concurrently with data i/o operations, speeding up training
3. They make batching/randomization from giant datasets split up over multiple files easy.

**Note:** There are lots of examples online using things like `QueueRunner`s (check what the thing actually is) that were popular before `Dataset`s were introduced.

## 1. The data

We need some data to work with. To start, we will just use a few .csv files. Later on we'll talk about .tfrecords, which is the preferred data format for TensorFlow.

Let's load up the Boston data set for the test data. To make things more interesting, we'll make the columns have some different data types and split it into several .csv files. (Clearly this entirely unnecessary to do with these data, but it will put us in a situation closer to reality.)

In [None]:
import pathlib

import numpy as np
import pandas as pd
import tensorflow as tf
from matplotlib import pyplot as plt
plt.style.use('seaborn')
from sklearn.datasets import load_boston

In [2]:
boston = load_boston()
features, labels = boston.data, boston.target
columns = [c.lower() for c in boston.feature_names]
df = pd.DataFrame(features, columns=columns)
df['chas'] = df['chas'].map({0.: 'Y', 1.: 'N'})
df['rad'] = df['rad'].astype(np.int64)
df['target'] = labels

In [3]:
df.head()

Unnamed: 0,crim,zn,indus,chas,nox,rm,age,dis,rad,tax,ptratio,b,lstat,target
0,0.00632,18.0,2.31,Y,0.538,6.575,65.2,4.09,1,296.0,15.3,396.9,4.98,24.0
1,0.02731,0.0,7.07,Y,0.469,6.421,78.9,4.9671,2,242.0,17.8,396.9,9.14,21.6
2,0.02729,0.0,7.07,Y,0.469,7.185,61.1,4.9671,2,242.0,17.8,392.83,4.03,34.7
3,0.03237,0.0,2.18,Y,0.458,6.998,45.8,6.0622,3,222.0,18.7,394.63,2.94,33.4
4,0.06905,0.0,2.18,Y,0.458,7.147,54.2,6.0622,3,222.0,18.7,396.9,5.33,36.2


In [4]:
# Split into multiple files
n_shards = 5
shard_size = len(df) // n_shards

data_dir = pathlib.Path('../data/sharded_data')
if not data_dir.exists():
    data_dir.mkdir()

df = df.sample(frac=1)
for i in range(n_shards):
    idx_start = i * shard_size
    idx_end = (i + 1) * shard_size
    df.iloc[idx_start:idx_end].to_csv(data_dir / 'boston-{0}.csv'.format(i), index=False)

## 2. Reading in data from a file

The general way we get in data with a `Dataset` is by instantiating a Dataset object, converting it to an iterator using the `make_one_shot_iterator` method, and get a batch of data with the iterator's `get_next` method. The `get_next` method returns an op (*verify this...*) which is why it is called only once (instead of each time we want to get the next batch of data).

Since we are reading in a single .csv, we use `TextLineDataset`, which reads in plain text files and returns a dataset where the rows of the text document are the records of the dataset. 

In [5]:
# Read a single file as text
file = (data_dir / 'boston-0.csv').as_posix()
dataset = tf.data.TextLineDataset(file)
iterator = dataset.make_one_shot_iterator()
batch = iterator.get_next()

with tf.Session() as sess:
    batch1 = sess.run(batch)
    batch2 = sess.run(batch)
    batch3 = sess.run(batch)
    
for b in (batch1, batch2, batch3):
    print(b)

b'crim,zn,indus,chas,nox,rm,age,dis,rad,tax,ptratio,b,lstat,target'
b'0.0837,45.0,3.44,Y,0.437,7.185,38.9,4.5667,5,398.0,15.2,396.9,5.39,34.9'
b'0.05023,35.0,6.06,Y,0.4379,5.706,28.4,6.6407,1,304.0,16.9,394.02,12.43,17.1'


Note that the three batches we got are each just a single string, and we also got the text of the header. We don't want to be including the header at all in the data, and we want an array for each row, not just a single string. 

The way datasets are modified is by chaining on methods which change the behavior of the dataset. Dealing with the header is straight forward; we can just use the dataset's `skip` method to skip the first row. To parse the rows as arrays and not a single string, we use the `map` method, which will apply the same function to every row of the dataset. TensorFlow provides a `decode_csv` function which converts a string tensor representing a row of a csv file into a tuple of tensors for each field of the csv.

In [6]:
# Decode csv requires a list of default values to use for each tensor
# produced. The defaults are passed as a list of lists.
DEFAULT_VALUES = [[0.0]] * 14
DEFAULT_VALUES[3] = ['_UNKNOWN']; DEFAULT_VALUES[8] = 0 

def parse_row(row):
    return tf.decode_csv(row, record_defaults=DEFAULT_VALUES)

dataset = tf.data.TextLineDataset(file)
dataset = dataset.skip(1) # skip the header
dataset = dataset.map(parse_row) # convert string to array
iterator = dataset.make_one_shot_iterator()
batch = iterator.get_next()

with tf.Session() as sess:
    batch1 = sess.run(batch)
    batch2 = sess.run(batch)
    batch3 = sess.run(batch)
    
for b in (batch1, batch2, batch3):
    print(b)

(0.0837, 45.0, 3.44, b'Y', 0.437, 7.185, 38.9, 4.5667, 5, 398.0, 15.2, 396.9, 5.39, 34.9)
(0.05023, 35.0, 6.06, b'Y', 0.4379, 5.706, 28.4, 6.6407, 1, 304.0, 16.9, 394.02, 12.43, 17.1)
(0.03961, 0.0, 5.19, b'Y', 0.515, 6.037, 34.5, 5.9853, 5, 224.0, 20.2, 396.9, 8.01, 21.1)


*Aside:* Since the `batch` op now produces a tuple of tensors instead of a single tensor, we're using `sess.run` instead of `batch.eval`. 

If all of our data is in a single file, that is it. We have our data input pipeline. We can apply additonal methods to spruce up our Dataset by shuffling the data, taking batches of more than just one element, improving memory management, and so on.


## 3. Reading in data from multiple files

Now that we've successfully read data from a single file, let's do multiple. The general idea is to first make a dataset of file names, and use the map method to make a dataset of datasets. This doesn't literally work: The iterator made from a dataset returns tensors, so has to have one of the allowable tensor datatypes. Hence, it can't return a dataset itself. However, there is a `flat_map` method which applies a function to the rows of all of the would-be datasets while simultaneously flattening them into a single dataset. This avoids ever actually having a dataset who returns tensors of type "dataset".

In [7]:
# Can use wildcards for data with similar names
file = (data_dir / 'boston-*.csv').as_posix()
dataset = tf.data.Dataset.list_files(file)
iterator = dataset.make_one_shot_iterator()
batch = iterator.get_next()

with tf.Session() as sess:
    batch1 = sess.run(batch)
    batch2 = sess.run(batch)
    batch3 = sess.run(batch)

# Just getting a dataset of individual file names
for b in (batch1, batch2, batch3):
    print(b)

b'data\\sharded_data\\boston-0.csv'
b'data\\sharded_data\\boston-4.csv'
b'data\\sharded_data\\boston-2.csv'


In [8]:
# Convert each file name into a dataset and flat_map

# Get dataset of file names
dataset = tf.data.Dataset.list_files(file)
# Combine all files into a single text dataset (without headers)
dataset = dataset.flat_map(lambda f: tf.data.TextLineDataset(f).skip(1))
# Convert each row into a tuple
dataset = dataset.map(parse_row)
iterator = dataset.make_one_shot_iterator()
batch = iterator.get_next()

with tf.Session() as sess:
    batch1 = sess.run(batch)
    batch2 = sess.run(batch)
    batch3 = sess.run(batch)

# Just getting a dataset of individual file names
for b in (batch1, batch2, batch3):
    print(b)

(0.7258, 0.0, 8.14, b'Y', 0.538, 5.727, 69.5, 3.7965, 4, 307.0, 21.0, 390.95, 11.28, 18.2)
(51.1358, 0.0, 18.1, b'Y', 0.597, 5.757, 100.0, 1.413, 24, 666.0, 20.2, 2.6, 10.11, 15.0)
(0.05735, 0.0, 4.49, b'Y', 0.449, 6.63, 56.1, 4.4377, 3, 247.0, 18.5, 392.3, 6.53, 26.6)


## 4. Some handy methods

While actually training a model, there are a few things we want to do:
1. Shuffle the data
2. Repeat the dataset for training over multiple epochs
3. Get batches of data
4. Preload the next batch of data while training...

(We also want to feed data into the `Estimator` during training as a tuple consisting of a dict of features and a label. This can be done in the `parse_row` function we wrote above. We'll go into this in more detail when we talk about `Estimators`.)

In [9]:
# Training parameters
n_epochs = 5
batch_size = 2

# Build data set
dataset = tf.data.Dataset.list_files(file)
dataset = dataset.flat_map(lambda f: tf.data.TextLineDataset(f).skip(1))
dataset = dataset.map(parse_row)
# Repeat the dataset
dataset = dataset.repeat(n_epochs)
# Shuffle data
dataset = dataset.shuffle(buffer_size=1024)
# Get a batch of data
dataset = dataset.batch(batch_size)
# Preload next batch to speed up training
dataset = dataset.prefetch(buffer_size=batch_size)
iterator = dataset.make_one_shot_iterator()
batch = iterator.get_next()

with tf.Session() as sess:
    batch1 = sess.run(batch)

print(batch1)

(array([2.44953, 3.67822], dtype=float32), array([0., 0.], dtype=float32), array([19.58, 18.1 ], dtype=float32), array([b'Y', b'Y'], dtype=object), array([0.605, 0.77 ], dtype=float32), array([6.402, 5.362], dtype=float32), array([95.2, 96.2], dtype=float32), array([2.2625, 2.1036], dtype=float32), array([ 5, 24]), array([403., 666.], dtype=float32), array([14.7, 20.2], dtype=float32), array([330.04, 380.79], dtype=float32), array([11.32, 10.19], dtype=float32), array([22.3, 20.8], dtype=float32))


A couple of remarks:
1. The number of repeats can be set to `None` in which case (according to the TensorFlow documentation) the model being fed by the dataset will train indefinitely. I am not sure how long indefinitely actually is?
2. When shuffling, the `buffer_size` parameter specifies how many records to read into memory and then shuffle. The smaller this number is, the less randomized the data will actually be; the larger it is the more memory is used. Here I am only reading in a KB of data into memory at a time. In real life you'd want to use several MB or a few GB if you got the ram for it. *I should check if buffer_size refers to the number of records or the max memory footprint of the loaded data...*
3. For prefetching, `buffer_size` specifies how many records to load into memory in advance. This is useful for speeding up training by allowing the dataset to load and process the next batch of training data while the previous batch is being consumed by the model.

There are a lot of other things that can be done to improve the efficiency of this bad boy, such as using "fused ops" which do several of these steps at once. For more information check out https://www.tensorflow.org/guide/performance/datasets