In [1]:
import tensorflow as tf
import numpy as np
import os
import time

In [2]:
X = tf.range(10)
dataset = tf.data.Dataset.from_tensor_slices(X)
print(dataset)
for item in dataset:
    print(item, item.numpy())

<TensorSliceDataset shapes: (), types: tf.int32>
tf.Tensor(0, shape=(), dtype=int32) 0
tf.Tensor(1, shape=(), dtype=int32) 1
tf.Tensor(2, shape=(), dtype=int32) 2
tf.Tensor(3, shape=(), dtype=int32) 3
tf.Tensor(4, shape=(), dtype=int32) 4
tf.Tensor(5, shape=(), dtype=int32) 5
tf.Tensor(6, shape=(), dtype=int32) 6
tf.Tensor(7, shape=(), dtype=int32) 7
tf.Tensor(8, shape=(), dtype=int32) 8
tf.Tensor(9, shape=(), dtype=int32) 9


In [3]:
dataset = dataset.repeat(3).batch(7, drop_remainder=True)
print(dataset)
for item in dataset:
    print(item, item.numpy())

<BatchDataset shapes: (7,), types: tf.int32>
tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int32) [0 1 2 3 4 5 6]
tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int32) [7 8 9 0 1 2 3]
tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int32) [4 5 6 7 8 9 0]
tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int32) [1 2 3 4 5 6 7]


In [4]:
dataset = dataset.map(lambda x: x * 2, num_parallel_calls=4)
print(dataset)
for item in dataset:
    print(item, item.numpy())

<ParallelMapDataset shapes: (7,), types: tf.int32>
tf.Tensor([ 0  2  4  6  8 10 12], shape=(7,), dtype=int32) [ 0  2  4  6  8 10 12]
tf.Tensor([14 16 18  0  2  4  6], shape=(7,), dtype=int32) [14 16 18  0  2  4  6]
tf.Tensor([ 8 10 12 14 16 18  0], shape=(7,), dtype=int32) [ 8 10 12 14 16 18  0]
tf.Tensor([ 2  4  6  8 10 12 14], shape=(7,), dtype=int32) [ 2  4  6  8 10 12 14]


In [5]:
dataset = dataset.apply(tf.data.Dataset.unbatch)
print(dataset)


<_UnbatchDataset shapes: (), types: tf.int32>


In [6]:
dataset = dataset.filter(lambda x: x > 10)
print(dataset)
for item in dataset:
    print(item, item.numpy())

<FilterDataset shapes: (), types: tf.int32>
tf.Tensor(12, shape=(), dtype=int32) 12
tf.Tensor(14, shape=(), dtype=int32) 14
tf.Tensor(16, shape=(), dtype=int32) 16
tf.Tensor(18, shape=(), dtype=int32) 18
tf.Tensor(12, shape=(), dtype=int32) 12
tf.Tensor(14, shape=(), dtype=int32) 14
tf.Tensor(16, shape=(), dtype=int32) 16
tf.Tensor(18, shape=(), dtype=int32) 18
tf.Tensor(12, shape=(), dtype=int32) 12
tf.Tensor(14, shape=(), dtype=int32) 14


In [7]:
for item in dataset.take(3):
    print(item, item.numpy())

tf.Tensor(12, shape=(), dtype=int32) 12
tf.Tensor(14, shape=(), dtype=int32) 14
tf.Tensor(16, shape=(), dtype=int32) 16


In [8]:
dataset = tf.data.Dataset.range(10).repeat(3)
dataset = dataset.shuffle(buffer_size=5, seed=42).batch(7)
print(dataset)
for item in dataset:
    print(item, item.numpy())

<BatchDataset shapes: (None,), types: tf.int64>
tf.Tensor([0 2 3 6 7 9 4], shape=(7,), dtype=int64) [0 2 3 6 7 9 4]
tf.Tensor([5 0 1 1 8 6 5], shape=(7,), dtype=int64) [5 0 1 1 8 6 5]
tf.Tensor([4 8 7 1 2 3 0], shape=(7,), dtype=int64) [4 8 7 1 2 3 0]
tf.Tensor([5 4 2 7 8 9 9], shape=(7,), dtype=int64) [5 4 2 7 8 9 9]
tf.Tensor([3 6], shape=(2,), dtype=int64) [3 6]


### Split the California dataset to multiple CSV files
Let's start by loading and preparing the California housing dataset. We first load it, then split it into a training set, a validation set and a test set, and finally we scale it:

In [9]:
from sklearn.preprocessing import StandardScaler
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split

housing = fetch_california_housing()

X_train_all, X_test, y_train_all, y_test = \
    train_test_split(housing.data, housing.target.reshape(-1, 1), random_state=42)
X_train, X_valid, y_train, y_valid = train_test_split(X_train_all, y_train_all, random_state=42)

scaler = StandardScaler()
scaler.fit(X_train)
X_mean = scaler.mean_
X_std = scaler.scale_
print('X_mean:', X_mean)
print('X_std:', X_std)

X_mean: [ 3.89175860e+00  2.86245478e+01  5.45593655e+00  1.09963474e+00
  1.42428122e+03  2.95886657e+00  3.56464315e+01 -1.19584363e+02]
X_std: [1.90927329e+00 1.26409177e+01 2.55038070e+00 4.65460128e-01
 1.09576000e+03 2.36138048e+00 2.13456672e+00 2.00093304e+00]


For a very large dataset that does not fit in memory, you will typically want to split it into many files first, then have TensorFlow read these files in parallel. To demonstrate this, let's start by splitting the housing dataset and save it to 20 CSV files:

In [10]:
def save_to_multiple_csv_files(data, name_prefix, header=None, n_parts=10):
    housing_dir = os.path.join("datasets", "housing")
    os.makedirs(housing_dir, exist_ok=True)
    path_format = os.path.join(housing_dir, "my_{}_{:02d}.csv")

    filepaths = []
    m = len(data)
    for file_idx, row_indices in enumerate(np.array_split(np.arange(m), n_parts)):
        part_csv = path_format.format(name_prefix, file_idx)
        filepaths.append(part_csv)
        with open(part_csv, "wt", encoding="utf-8") as f:
            if header is not None:
                f.write(header)
                f.write("\n")
            for row_idx in row_indices:
                f.write(",".join([repr(col) for col in data[row_idx]]))
                f.write("\n")
    return filepaths

train_data = np.c_[X_train, y_train]
valid_data = np.c_[X_valid, y_valid]
test_data = np.c_[X_test, y_test]
header_cols = housing.feature_names + ["MedianHouseValue"]
header = ",".join(header_cols)

train_filepaths = save_to_multiple_csv_files(train_data, "train", header, n_parts=20)
valid_filepaths = save_to_multiple_csv_files(valid_data, "valid", header, n_parts=10)
test_filepaths = save_to_multiple_csv_files(test_data, "test", header, n_parts=10)

In [11]:
import pandas as pd
pd.read_csv(train_filepaths[0]).head()

Unnamed: 0,MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,MedianHouseValue
0,3.5214,15.0,3.049945,1.106548,1447.0,1.605993,37.63,-122.43,1.442
1,5.3275,5.0,6.49006,0.991054,3464.0,3.44334,33.69,-117.39,1.687
2,3.1,29.0,7.542373,1.591525,1328.0,2.250847,38.44,-122.98,1.621
3,7.1736,12.0,6.289003,0.997442,1054.0,2.695652,33.55,-117.7,2.621
4,2.0549,13.0,5.312457,1.085092,3297.0,2.244384,33.93,-116.93,0.956


In [12]:
with open(train_filepaths[0]) as f:
    for i in range(5):
        print(f.readline(), end="")

MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,MedianHouseValue
3.5214,15.0,3.0499445061043287,1.106548279689234,1447.0,1.6059933407325193,37.63,-122.43,1.442
5.3275,5.0,6.490059642147117,0.9910536779324056,3464.0,3.4433399602385686,33.69,-117.39,1.687
3.1,29.0,7.5423728813559325,1.5915254237288134,1328.0,2.2508474576271187,38.44,-122.98,1.621
7.1736,12.0,6.289002557544757,0.9974424552429667,1054.0,2.6956521739130435,33.55,-117.7,2.621


In [13]:
train_filepaths

['datasets/housing/my_train_00.csv',
 'datasets/housing/my_train_01.csv',
 'datasets/housing/my_train_02.csv',
 'datasets/housing/my_train_03.csv',
 'datasets/housing/my_train_04.csv',
 'datasets/housing/my_train_05.csv',
 'datasets/housing/my_train_06.csv',
 'datasets/housing/my_train_07.csv',
 'datasets/housing/my_train_08.csv',
 'datasets/housing/my_train_09.csv',
 'datasets/housing/my_train_10.csv',
 'datasets/housing/my_train_11.csv',
 'datasets/housing/my_train_12.csv',
 'datasets/housing/my_train_13.csv',
 'datasets/housing/my_train_14.csv',
 'datasets/housing/my_train_15.csv',
 'datasets/housing/my_train_16.csv',
 'datasets/housing/my_train_17.csv',
 'datasets/housing/my_train_18.csv',
 'datasets/housing/my_train_19.csv']

### Building an Input Pipeline

In [14]:
filepath_dataset = tf.data.Dataset.list_files(train_filepaths, seed=42)
for item in filepath_dataset:
    print(item)

tf.Tensor(b'datasets/housing/my_train_05.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_16.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_01.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_17.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_00.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_14.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_10.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_02.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_12.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_19.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_07.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_09.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_13.csv', shape=(), dtype=string)
tf.Tensor(b'datasets/housing/my_train_15.csv', shape=(), dtype=string)
tf.Ten

In [15]:
n_readers = 5
dataset = filepath_dataset.interleave(lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
                                     cycle_length=n_readers)


In [16]:
for line in dataset.take(5):
    print(line.numpy())

b'4.5909,16.0,5.475877192982456,1.0964912280701755,1357.0,2.9758771929824563,33.63,-117.71,2.418'
b'2.4792,24.0,3.4547038327526134,1.1341463414634145,2251.0,3.921602787456446,34.18,-118.38,2.0'
b'4.2708,45.0,5.121387283236994,0.953757225433526,492.0,2.8439306358381504,37.48,-122.19,2.67'
b'2.1856,41.0,3.7189873417721517,1.0658227848101265,803.0,2.0329113924050635,32.76,-117.12,1.205'
b'4.1812,52.0,5.701388888888889,0.9965277777777778,692.0,2.4027777777777777,33.73,-118.31,3.215'


### Preprocessing the Data

In [17]:
n_inputs = 8
@tf.function
def preprocess(line):
    defs = [0.] * n_inputs + [tf.constant([], dtype=tf.float32)]
    fields = tf.io.decode_csv(line, record_defaults=defs)
    x = tf.stack(fields[: -1])
    y = tf.stack(fields[-1: ])
    return (x - X_mean)/X_std, y
preprocess(b'4.2083,44.0,5.3232,0.9171,846.0,2.3370,37.47,-122.2,2.782')

(<tf.Tensor: id=242, shape=(8,), dtype=float32, numpy=
 array([ 0.16579157,  1.216324  , -0.05204565, -0.39215982, -0.5277444 ,
        -0.2633488 ,  0.8543046 , -1.3072058 ], dtype=float32)>,
 <tf.Tensor: id=243, shape=(1,), dtype=float32, numpy=array([2.782], dtype=float32)>)

In [18]:
def csv_reader_dataset(filepaths, repeat=1, n_readers=5,
                       n_read_threads=None, shuffle_buffer_size=10000,
                       n_parse_threads=5, batch_size=32):
    dataset = tf.data.Dataset.list_files(filepaths).repeat(repeat)
    dataset = dataset.interleave(
        lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
        cycle_length=n_readers, num_parallel_calls=n_read_threads)
    dataset = dataset.shuffle(shuffle_buffer_size)
    dataset = dataset.map(preprocess, num_parallel_calls=n_parse_threads)
    dataset = dataset.batch(batch_size)
    return dataset.prefetch(1)
    

In [19]:
train_dataset = csv_reader_dataset(train_filepaths, batch_size=3)
for X_batches, y_batches in train_dataset.take(3):
    print("X = ", X_batches)
    print("y = ", y_batches)

X =  tf.Tensor(
[[ 0.26949593  0.26702586 -0.30118766 -0.20901318 -0.4383088  -0.31460586
  -0.9493411   0.83679074]
 [ 0.9041353   1.216324    0.2658563  -0.14210017 -0.8489827  -0.2520688
  -0.79005814  0.5819124 ]
 [ 0.17385744 -0.2867314   0.21145487 -0.10381573  0.51993024 -0.24536143
   1.2290876  -1.4321475 ]], shape=(3, 8), dtype=float32)
y =  tf.Tensor(
[[5.00001]
 [3.659  ]
 [2.623  ]], shape=(3, 1), dtype=float32)
X =  tf.Tensor(
[[ 0.5788807  -1.5524622   0.50143665 -0.01800381  1.5146736  -0.01156432
  -0.14824045  0.24706951]
 [ 5.8180466  -0.7613805   0.8509813  -0.3442633  -1.0561448  -0.11090836
   0.75123864 -1.1972603 ]
 [-0.83804595 -1.1569214   0.25472802 -0.18736824 -0.93568045 -0.20352748
   1.6741431  -0.82743007]], shape=(3, 8), dtype=float32)
y =  tf.Tensor(
[[1.429  ]
 [5.00001]
 [1.451  ]], shape=(3, 1), dtype=float32)
X =  tf.Tensor(
[[-0.92713743 -1.0778131  -0.04753245  0.14993049 -0.7686731  -0.2345695
   1.9364903  -0.49258715]
 [-0.532537    0.6625668 

### Using the Dataset With tf.keras

In [20]:
train_set = csv_reader_dataset(train_filepaths, repeat=None)
valid_set = csv_reader_dataset(valid_filepaths)
test_set = csv_reader_dataset(test_filepaths)

In [21]:
model = tf.keras.models.Sequential([
    tf.keras.layers.Dense(30, activation="relu", input_shape=X_train.shape[1:]),
    tf.keras.layers.Dense(1),
])

In [22]:
model.compile(loss="mse", optimizer=tf.keras.optimizers.SGD(lr=1e-3))

In [23]:
batch_size = 32
model.fit(train_set, steps_per_epoch=len(X_train) // batch_size, epochs=10,
          validation_data=valid_set)

Train for 362 steps
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x7ff315036450>

In [24]:
model.evaluate(test_set, steps=len(X_test) // batch_size)



0.4942636078003771

In [25]:
new_set = test_set.map(lambda X, y: X) # we could instead just pass test_set, Keras would ignore the labels
X_new = X_test
model.predict(new_set, steps=len(X_new) // batch_size)

array([[0.68791395],
       [2.364013  ],
       [3.2024662 ],
       ...,
       [2.188284  ],
       [2.0472136 ],
       [1.3008043 ]], dtype=float32)

In [39]:
optimizer = tf.keras.optimizers.Nadam(learning_rate=1e-3)
loss_fn = tf.keras.losses.mean_squared_error

n_epochs = 5
batch_size = 32
step_per_epoch = len(X_train)//batch_size
total_steps = step_per_epoch * n_epochs
global_step = 0
for X_batch, y_batch in train_set.take(total_steps):
    global_step += 1
    print("\rGlobal step {}/{}".format(global_step, total_steps), end="")
    with tf.GradientTape() as tape:
        y_pred = model(X_batch)
        main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
        loss = tf.add_n([main_loss] + model.losses)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

Global step 1810/1810

In [40]:
optimizer = tf.keras.optimizers.Nadam(learning_rate=1e-3)
loss_fn = tf.keras.losses.mean_squared_error

@tf.function
def train(model, n_epochs, batch_size=32,
          n_readers=5, n_read_threads=5, shuffle_buffer_size=10000, n_parse_threads=5):
    train_set = csv_reader_dataset(train_filepaths, repeat=n_epochs, batch_size=batch_size,
                                   n_readers=n_readers, n_read_threads=n_read_threads,
                                  n_parse_threads=n_parse_threads, shuffle_buffer_size=shuffle_buffer_size)
    for X_batch, y_batch in train_set:
        with tf.GradientTape() as tape:
            y_pred = model(X_batch)
            main_loss = tf.reduce_mean(loss_fn(y_pred, y_batch))
            loss = tf.add_n([main_loss] + model.losses)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        
train(model, n_epochs=5)

In [43]:
optimizer = tf.keras.optimizers.Nadam(learning_rate=1e-3)
loss_fn = tf.keras.losses.mean_squared_error

@tf.function
def train(model, n_epochs, batch_size=32,
          n_readers=5, n_read_threads=5, shuffle_buffer_size=10000, n_parse_threads=5):
    train_set = csv_reader_dataset(train_filepaths, repeat=n_epochs, batch_size=batch_size,
                                   n_readers=n_readers, n_read_threads=n_read_threads,
                                  n_parse_threads=n_parse_threads, shuffle_buffer_size=shuffle_buffer_size)
    batch_size = 32
    step_per_epoch = len(X_train) // batch_size
    total_steps = step_per_epoch * n_epochs
    global_step = 0
    for X_batch, y_batch in train_set.take(total_steps):
        global_step += 1
        if tf.equal(global_step % 100, 0):
            tf.print("\rGlobal step", global_step, "/", total_steps)
        with tf.GradientTape() as tape:
            y_pred = model(X_batch)
            main_loss = tf.reduce_mean(loss_fn(y_pred, y_batch))
            loss = tf.add_n([main_loss] + model.losses)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        
train(model, n_epochs=5)

Global step 100 / 1810
Global step 200 / 1810
Global step 300 / 1810
Global step 400 / 1810
Global step 500 / 1810
Global step 600 / 1810
Global step 700 / 1810
Global step 800 / 1810
Global step 900 / 1810
Global step 1000 / 1810
Global step 1100 / 1810
Global step 1200 / 1810
Global step 1300 / 1810
Global step 1400 / 1810
Global step 1500 / 1810
Global step 1600 / 1810
Global step 1700 / 1810
Global step 1800 / 1810


In [None]:
for m in dir(tf.data.Dataset):
    if not (m.startswith("_") or m.endswith("_")):
        func = getattr(tf.data.Dataset, m)
        if hasattr(func, "__doc__"):
            print("● {:21s}{}".format(m + "()", func.__doc__.split("\n")[0]))