In [1]:
import os
import time
import tensorflow as tf

## Reading files efficiently

#### Datasets basics

In [2]:
dataset = tf.data.Dataset.range(5)

In [3]:
dataset = dataset.repeat(2).shuffle(buffer_size=3, seed=73).batch(3)

In [4]:
for batch in dataset:
    print(batch)

tf.Tensor([1 3 4], shape=(3,), dtype=int64)
tf.Tensor([2 0 1], shape=(3,), dtype=int64)
tf.Tensor([3 2 4], shape=(3,), dtype=int64)
tf.Tensor([0], shape=(1,), dtype=int64)


#### Data generation

In [5]:
def gen_csv(n):
    os.makedirs('data', exist_ok=True)
    filename = f'sample{n}.csv'
    cols = ['a','b','d','c','e'] * 100
    lines = tf.random.uniform((100,len(cols)))
    with open('data/' + filename, 'w') as f:
        f.write(','.join(cols) + '\n')
        for line in lines:
            f.write(','.join([str(i) for i in line.numpy()])+ '\n')

In [6]:
%%capture
[gen_csv(n) for n in range(4)]

#### Reading from different files

In [7]:
filepaths = 'data/sample*.csv' # this can be a list of files also.

In [8]:
filepath_dataset = tf.data.Dataset.list_files(filepaths, shuffle=True, seed=73)

In [9]:
for p in filepath_dataset:
    print(p)

tf.Tensor(b'data/sample2.csv', shape=(), dtype=string)
tf.Tensor(b'data/sample3.csv', shape=(), dtype=string)
tf.Tensor(b'data/sample0.csv', shape=(), dtype=string)
tf.Tensor(b'data/sample1.csv', shape=(), dtype=string)


In [10]:
def preprocess(line):
    return tf.stack(tf.io.decode_csv(line, record_defaults=[0.]*500))

In [11]:
def process(numbers):
    numbers = tf.math.cos(tf.math.sin(numbers))
    numbers = tf.math.log(tf.math.sqrt(tf.math.sinh(tf.math.square(numbers))))
    return tf.math.tanh(numbers)

##### Reading sequentially

In [12]:
dataset = filepath_dataset.interleave(
    lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
    cycle_length=4
)
dataset = dataset.map(preprocess)
dataset = dataset.repeat(2).shuffle(buffer_size=3, seed=73)

In [13]:
%%timeit -n5 -r2
for line in dataset:
    pass

2.41 s ± 17.4 ms per loop (mean ± std. dev. of 2 runs, 5 loops each)


##### Reading in parallel

In [14]:
dataset = filepath_dataset.interleave(
    lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
    cycle_length=4,
    num_parallel_calls=2
)
dataset = dataset.map(preprocess, num_parallel_calls=2)
dataset = dataset.repeat(2).shuffle(buffer_size=3, seed=73)

In [15]:
%%timeit -n5 -r2
for line in dataset:
    pass

1.49 s ± 2.6 ms per loop (mean ± std. dev. of 2 runs, 5 loops each)


##### Caching

* Only if the data fits in memory
* Always after loading and processing, but before suffling, repeating, batching and prefetching.

In [16]:
dataset = filepath_dataset.interleave(
    lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
    cycle_length=4,
    num_parallel_calls=2
)
dataset = dataset.map(preprocess, num_parallel_calls=2)
dataset = dataset.cache()
dataset = dataset.repeat(2).shuffle(buffer_size=3, seed=73)

In [17]:
%%timeit -n5 -r2
for line in dataset:
    pass

633 ms ± 6.18 ms per loop (mean ± std. dev. of 2 runs, 5 loops each)


##### Prefetching

In [18]:
dataset = filepath_dataset.interleave(
    lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
    cycle_length=4,
    num_parallel_calls=2
)
dataset = dataset.map(preprocess, num_parallel_calls=2)
dataset = dataset.repeat(2).shuffle(buffer_size=3, seed=73)

In [19]:
%%timeit -n5 -r2
for line in dataset:
    time.sleep(0.0001)

2.4 s ± 860 ms per loop (mean ± std. dev. of 2 runs, 5 loops each)


In [20]:
dataset = filepath_dataset.interleave(
    lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
    cycle_length=4,
    num_parallel_calls=2
)
dataset = dataset.map(preprocess, num_parallel_calls=2).prefetch(2)
dataset = dataset.repeat(2).shuffle(buffer_size=3, seed=73)

In [21]:
%%timeit -n5 -r2
for line in dataset:
    time.sleep(0.0001)

1.58 s ± 109 ms per loop (mean ± std. dev. of 2 runs, 5 loops each)
