### tf.data. Dataset

This is a trivial exersice to understand `tf.data.Dataset`

This API supports writing descriptive and efficient input pipelines

In [1]:
import tensorflow as tf
import numpy as np

Creating numpy array of integer numbers.

This is source or input dataset

In [2]:
array = np.arange(10)

In [3]:
array

array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])

Converting input dataset into tensorflow Dataset object, by slicing it using `from_tensor_slices`

In [4]:
input_dataset = tf.data.Dataset.from_tensor_slices(array)


In [5]:
input_dataset

<DatasetV1Adapter shapes: (), types: tf.int32>

##### one shot iterator
Next step is to create an Iterator for elements of dataset using `make_one_shot_iterator`. 

The iterator arising from this method can only be initialized and run once – it can’t be re-initialized.

In [6]:
iterator = input_dataset.make_one_shot_iterator()

Instructions for updating:
Use `for ... in dataset:` to iterate over a dataset. If using `tf.estimator`, return the `Dataset` object directly from your input function. As a last resort, you can use `tf.compat.v1.data.make_one_shot_iterator(dataset)`.


After the iterator is created, next step is to setup a TensorFlow operation which extracts elements from the dataset.

In [7]:
next_data = iterator.get_next()

In-order to visualise how data is extracted, we can run it in session.

If eager execution is enabled, then we can directly print 'next_data'

In [8]:
with tf.Session() as sess:
    for i in range(10):
        print(sess.run(next_data))

0
1
2
3
4
5
6
7
8
9


### batch

Now lets explore `batch` function of Dataset, 

`batch()` - is function in `tf.data.Dataset` which creates batch based on `batch count`

Here we took batch count=3

In [9]:
input_dataset_batch2 = input_dataset.batch(2)

iterator = input_dataset_batch2.make_one_shot_iterator()
next_data = iterator.get_next()

with tf.Session() as sess:
    try:
        while True:
            print(sess.run(next_data))
            print("Done")
    except:
        pass

[0 1]
Done
[2 3]
Done
[4 5]
Done
[6 7]
Done
[8 9]
Done


In [10]:
input_dataset_batch = input_dataset.batch(3)

iterator = input_dataset_batch.make_one_shot_iterator()
next_data = iterator.get_next()

with tf.Session() as sess:
    try:
        while True:
            print(sess.run(next_data))
    except:
        pass

[0 1 2]
[3 4 5]
[6 7 8]
[9]


### Shuffle

This function shuffles dataset, we need to specify buffersize.

Observe the dataset sequence in below examples, 

`shuffle()` after `batch()` shuffles batches.

`shuffle()` before `batch()` shuffles dataset and the creates batch.

`buffersize` = 10

In [11]:
input_dataset_batch_shuffle = input_dataset.batch(2).shuffle(10)

iterator = input_dataset_batch_shuffle.make_one_shot_iterator()
next_data = iterator.get_next()

with tf.Session() as sess:
    try:
        while True:
            print(sess.run(next_data))
    except:
        pass

[2 3]
[0 1]
[8 9]
[6 7]
[4 5]


In [12]:
input_dataset_shuffle_batch = input_dataset.shuffle(10).batch(2)

iterator = input_dataset_shuffle_batch.make_one_shot_iterator()
next_data = iterator.get_next()

with tf.Session() as sess:
    try:
        while True:
            print(sess.run(next_data))
    except:
        pass

[8 6]
[0 2]
[4 3]
[9 7]
[5 1]


This shows that using `shuffle` after `batch` will just shuffle batches not data,
Best way is to `shuffle` and `batch`, so that data is shufled across batches

### zip
If we have multiple dataset to be combined, or need to combine train value and train label, then we use `zip`

`zip` is function which can zip or combine different datasets

In [13]:
combined_dataset = tf.data.Dataset.zip((input_dataset, input_dataset)).batch(3)

iterator = combined_dataset.make_one_shot_iterator()
next_data =iterator.get_next()

with tf.Session() as sess:
    try:
        while True:
            print(sess.run(next_data))
    except:
        pass

(array([0, 1, 2]), array([0, 1, 2]))
(array([3, 4, 5]), array([3, 4, 5]))
(array([6, 7, 8]), array([6, 7, 8]))
(array([9]), array([9]))


### map()
If any function is to be applied to dataset, then `map` is used

`map` is a function which applies transformation to each elements of input dataset

In [14]:
input_dataset_fnc = input_dataset.map(lambda x: x+2)

iterator = input_dataset_fnc.make_one_shot_iterator()
next_data = iterator.get_next()

with tf.Session() as sess:
    try:
        while True:
            print(sess.run(next_data))
    except:
        pass

2
3
4
5
6
7
8
9
10
11


Mapping another function to input dataset

In [15]:
input_dataset_fnc_2 = input_dataset.map(lambda x: x+2, num_parallel_calls=-1)

iterator = input_dataset_fnc_2.make_one_shot_iterator()
next_data = iterator.get_next()

with tf.Session() as sess:
    try:
        while True:
            print(sess.run(next_data))
    except:
        pass

2
3
4
5
6
7
8
9
10
11


### repeat()

Now we explore `repeat()` function of `tf.data.Dataset`

`repeat` function has argument `count`, which asks how many time to repeat the function

> **NOTE** : when `repeat()` is used without any argument, then the code is executed indefinitely without throwing OutOfRangeError

In [16]:
input_dataset_fnc_3 = input_dataset.map(lambda x: x+2).repeat(count=3).batch(3)

iterator = input_dataset_fnc_3.make_one_shot_iterator()
next_data = iterator.get_next()

with tf.Session() as sess:
    try:
        while True:
            print(sess.run(next_data))
    except:
        pass

[2 3 4]
[5 6 7]
[ 8  9 10]
[11  2  3]
[4 5 6]
[7 8 9]
[10 11  2]
[3 4 5]
[6 7 8]
[ 9 10 11]


#### one hot encoding

Inorder to convert label to one hot, we can apply tensorflow inbuilt function `tf.one_hot` to entire dataset using `map()`

In [17]:
input_dataset_hot = input_dataset.map(lambda x: tf.one_hot(x,10)).batch(2)

iterator = input_dataset_hot.make_one_shot_iterator()
next_data = iterator.get_next()

with tf.Session() as sess:
    try:
        while True:
            print(sess.run(next_data))
    except:
        pass

[[1. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0. 0. 0. 0.]]
[[0. 0. 1. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 1. 0. 0. 0. 0. 0. 0.]]
[[0. 0. 0. 0. 1. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 1. 0. 0. 0. 0.]]
[[0. 0. 0. 0. 0. 0. 1. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 1. 0. 0.]]
[[0. 0. 0. 0. 0. 0. 0. 0. 1. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 1.]]


from <https://www.tensorflow.org/guide/performance/datasets#map_and_interleave_prefetch_shuffle>

The tf.data.Dataset.repeat transformation repeats the input data a finite (or infinite) number of times; each repetition of the data is typically referred to as an epoch. The tf.data.Dataset.shuffle transformation randomizes the order of the dataset's examples.

If the repeat transformation is applied before the shuffle transformation, then the epoch boundaries are blurred. That is, certain elements can be repeated before other elements appear even once. On the other hand, if the shuffle transformation is applied before the repeat transformation, then performance might slow down at the beginning of each epoch related to initialization of the internal state of the shuffle transformation. In other words, the former (repeat before shuffle) provides better performance, while the latter (shuffle before repeat) provides stronger ordering guarantees.

"We recommend applying the shuffle transformation before the repeat transformation."

##### Let us observe few combination of these functions

In [18]:
input_dataset_s_b_r = input_dataset.shuffle(5).batch(2).repeat(2)

iterator = input_dataset_s_b_r.make_one_shot_iterator()
next_data = iterator.get_next()

with tf.Session() as sess:
    try:
        while True:
            print(sess.run(next_data))
    except:
        pass

[4 0]
[1 5]
[3 6]
[7 8]
[2 9]
[3 2]
[6 5]
[4 8]
[7 9]
[0 1]


In [19]:
input_dataset_s_r_b = input_dataset.shuffle(5).repeat(2).batch(2)

iterator = input_dataset_s_r_b.make_one_shot_iterator()
next_data = iterator.get_next()

with tf.Session() as sess:
    try:
        while True:
            print(sess.run(next_data))
    except:
        pass

[1 4]
[0 3]
[8 6]
[2 5]
[9 7]
[3 4]
[0 6]
[8 9]
[5 7]
[1 2]


In [20]:
input_dataset_r_s_b = input_dataset.repeat(2).shuffle(5).batch(2)

iterator = input_dataset_r_s_b.make_one_shot_iterator()
next_data = iterator.get_next()

with tf.Session() as sess:
    try:
        while True:
            print(sess.run(next_data))
    except:
        pass

[1 5]
[6 7]
[4 0]
[3 0]
[8 1]
[4 2]
[3 5]
[2 9]
[7 6]
[9 8]


###### Conclusion
From above example we can makeout that (consider epoch as one set of batch (0,1) (2,3)...))

1) when `shuffle` and `repeat`, the data is shuffled within epoch, i,e there exist clear boundary

2) when `repeat` and `shuffle`, there is no proper boundary, you have duplicate data in same epoch

We can eloberate or access each elements in batch

In [21]:
ex1 = input_dataset.batch(2)
iterator = ex1.make_one_shot_iterator()
next_image = iterator.get_next()

with tf.Session() as sess:
    for i in range(5):
        num = sess.run(next_image)
        print(num)
        print(num[0])
        print(num[1])

[0 1]
0
1
[2 3]
2
3
[4 5]
4
5
[6 7]
6
7
[8 9]
8
9


### flat_map

Use `flat_map` if you want to make sure that the order of your dataset stays the same. 

For example, to flatten a dataset of batches into a dataset of their elements:

In [22]:
ary_2 = np.arange(0,12)
a = tf.data.Dataset.from_tensor_slices(ary_2).batch(2)
a = a.flat_map(lambda x: tf.data.Dataset.from_tensor_slices(x+1))

iterator = a.make_one_shot_iterator()
next_one = iterator.get_next()


with tf.Session() as sess:
    try:
        while True:
            print(sess.run(next_one))
    except:
        pass

1
2
3
4
5
6
7
8
9
10
11
12


### interleave

`interleave` maps function across dataset, 

It parallely access data from different batches of dataset using `cycle length`.

Below is the example with different values of `cycle lenght`

In [23]:
ary_2 = np.arange(0,12)
a = tf.data.Dataset.from_tensor_slices(ary_2).batch(2)
# a = a.flat_map(lambda x: tf.data.Dataset.from_tensor_slices(x+1))

iterator = a.make_one_shot_iterator()
next_one = iterator.get_next()


with tf.Session() as sess:
    print("This is with just batch size of 2")
    try:
        while True:
            print(sess.run(next_one))
    except:
        pass


ary_3 = tf.data.Dataset.range(0,12).batch(2)

# a = tf.data.Dataset.from_tensor_slices(ary_2).batch(2)
a = ary_3.interleave(lambda x: tf.data.Dataset.from_tensor_slices(x),
                                                     cycle_length=2)


iterator = a.make_one_shot_iterator()
next_one = iterator.get_next()


with tf.Session() as sess:
    print("This is with batch size of 2, cycle length 2")
    try:
        while True:
            print(sess.run(next_one))
    except:
        pass
    


# a = tf.data.Dataset.from_tensor_slices(ary_2).batch(2)
a = ary_3.interleave(lambda x: tf.data.Dataset.from_tensor_slices(x),
                                                     cycle_length=4)


iterator = a.make_one_shot_iterator()
next_one = iterator.get_next()


with tf.Session() as sess:
    print("This is with batch size of 2, cycle length 4")
    try:
        while True:
            print(sess.run(next_one))
    except:
        pass
    


# a = tf.data.Dataset.from_tensor_slices(ary_2).batch(2)
a = ary_3.interleave(lambda x: tf.data.Dataset.from_tensor_slices(x),
                                                     cycle_length=6)


iterator = a.make_one_shot_iterator()
next_one = iterator.get_next()


with tf.Session() as sess:
    print("This is with batch size of 2, cycle length 6")
    try:
        while True:
            print(sess.run(next_one))
    except:
        pass
    
print("This implies cycle_length controls the number of input elements that are processed concurrently.")

This is with just batch size of 2
[0 1]
[2 3]
[4 5]
[6 7]
[8 9]
[10 11]
This is with batch size of 2, cycle length 2
0
2
1
3
4
6
5
7
8
10
9
11
This is with batch size of 2, cycle length 4
0
2
4
6
1
3
5
7
8
10
9
11
This is with batch size of 2, cycle length 6
0
2
4
6
8
10
1
3
5
7
9
11
This implies cycle_length controls the number of input elements that are processed concurrently.


`block_lenght`

If we want to access more than one element in each batch in interleave maping then we use `block_lenght`

In [24]:
ary_2 = np.arange(0,12)
a = tf.data.Dataset.from_tensor_slices(ary_2).batch(4)
# a = a.flat_map(lambda x: tf.data.Dataset.from_tensor_slices(x+1))

iterator = a.make_one_shot_iterator()
next_one = iterator.get_next()


with tf.Session() as sess:
    print("This is with just batch size of 2")
    try:
        while True:
            print(sess.run(next_one))
    except:
        pass


ary_3 = tf.data.Dataset.range(0,12).batch(4)

# a = tf.data.Dataset.from_tensor_slices(ary_2).batch(2)
a = ary_3.interleave(lambda x: tf.data.Dataset.from_tensor_slices(x),
                                                     cycle_length=2, block_length = 1)


iterator = a.make_one_shot_iterator()
next_one = iterator.get_next()


with tf.Session() as sess:
    print("This is with batch size of 2, cycle length 2, block_length 1")
    try:
        while True:
            print(sess.run(next_one))
    except:
        pass
    


# a = tf.data.Dataset.from_tensor_slices(ary_2).batch(2)
a = ary_3.interleave(lambda x: tf.data.Dataset.from_tensor_slices(x),
                                                     cycle_length=2, block_length = 2)


iterator = a.make_one_shot_iterator()
next_one = iterator.get_next()


with tf.Session() as sess:
    print("This is with batch size of 2, cycle length 2, block_length 2")
    try:
        while True:
            print(sess.run(next_one))
    except:
        pass
    


# a = tf.data.Dataset.from_tensor_slices(ary_2).batch(2)
a = ary_3.interleave(lambda x: tf.data.Dataset.from_tensor_slices(x),
                                                     cycle_length=2, block_length = 3)


iterator = a.make_one_shot_iterator()
next_one = iterator.get_next()


with tf.Session() as sess:
    print("This is with batch size of 2, cycle length 2, block_length 3")
    try:
        while True:
            print(sess.run(next_one))
    except:
        pass
    
print("This implies block_length controls the number of \
      consecutive elements to produce from each input element before cycling to another input element..")

This is with just batch size of 2
[0 1 2 3]
[4 5 6 7]
[ 8  9 10 11]
This is with batch size of 2, cycle length 2, block_length 1
0
4
1
5
2
6
3
7
8
9
10
11
This is with batch size of 2, cycle length 2, block_length 2
0
1
4
5
2
3
6
7
8
9
10
11
This is with batch size of 2, cycle length 2, block_length 3
0
1
2
4
5
6
3
7
8
9
10
11
This implies block_length controls the number of       consecutive elements to produce from each input element before cycling to another input element..


Few miscellaneous examples

In [25]:
def parse_fn(x):
    return x

ary_4 = tf.data.Dataset.range(0,12).batch(6)

iterator = ary_4.make_one_shot_iterator()
next_one = iterator.get_next()


with tf.Session() as sess:
    print("This is with batch size of 6")
    try:
        while True:
            print(sess.run(next_one))
    except:
        pass

a = ary_4.interleave(lambda x: tf.data.Dataset.from_tensor_slices(x).map(parse_fn),
                                                     cycle_length=2, block_length = 1).shuffle(2)


iterator = a.make_one_shot_iterator()
next_one = iterator.get_next()


with tf.Session() as sess:
    print("This is with batch size of 2, cycle length 2, block_length 1, shuffle 2")
    try:
        while True:
            print(sess.run(next_one))
    except:
        pass
    
a = ary_4.interleave(lambda x: tf.data.Dataset.from_tensor_slices(x).map(parse_fn),
                                                     cycle_length=2, block_length = 1).shuffle(1)


iterator = a.make_one_shot_iterator()
next_one = iterator.get_next()


with tf.Session() as sess:
    print("This is with batch size of 2, cycle length 2, block_length 1, shuffle 1")
    try:
        while True:
            print(sess.run(next_one))
    except:
        pass
    
a = ary_4.interleave(lambda x: tf.data.Dataset.from_tensor_slices(x).map(parse_fn),
                                                     cycle_length=2, block_length = 1).shuffle(4)


iterator = a.make_one_shot_iterator()
next_one = iterator.get_next()


with tf.Session() as sess:
    print("This is with batch size of 2, cycle length 2, block_length 2, shuffle 4")
    try:
        while True:
            print(sess.run(next_one))
            print("DONE...")
    except:
        pass

This is with batch size of 6
[0 1 2 3 4 5]
[ 6  7  8  9 10 11]
This is with batch size of 2, cycle length 2, block_length 1, shuffle 2
0
1
7
2
8
3
6
4
10
5
9
11
This is with batch size of 2, cycle length 2, block_length 1, shuffle 1
0
6
1
7
2
8
3
9
4
10
5
11
This is with batch size of 2, cycle length 2, block_length 2, shuffle 4
0
DONE...
6
DONE...
8
DONE...
1
DONE...
9
DONE...
3
DONE...
2
DONE...
5
DONE...
7
DONE...
4
DONE...
10
DONE...
11
DONE...


In [26]:
def parse_fn(x):
    return x

ary_4 = tf.data.Dataset.range(0,12).batch(6)

iterator = ary_4.make_one_shot_iterator()
next_one = iterator.get_next()


with tf.Session() as sess:
    print("This is with batch size of 6")
    try:
        while True:
            print(sess.run(next_one))
    except:
        pass

a = ary_4.interleave(lambda x: tf.data.Dataset.from_tensor_slices(x).map(parse_fn),
                                                     cycle_length=2, block_length = 1).shuffle(2).batch(2)


iterator = a.make_one_shot_iterator()
next_one = iterator.get_next()


with tf.Session() as sess:
    print("This is with batch size of 2, cycle length 2, block_length 1, shuffle 2")
    try:
        while True:
            print(sess.run(next_one))
    except:
        pass

This is with batch size of 6
[0 1 2 3 4 5]
[ 6  7  8  9 10 11]
This is with batch size of 2, cycle length 2, block_length 1, shuffle 2
[6 1]
[0 7]
[8 3]
[9 4]
[2 5]
[10 11]
