## Standard flow control and data processing DataPipes

In [1]:
from torch.utils.data import IterDataPipe

In [2]:
# Example IterDataPipe
class ExampleIterPipe(IterDataPipe):
    def __init__(self, range = 20):
        self.range = range
    def __iter__(self):
        for i in range(self.range):
            yield i

## Concat

Function: `concat`

Description: Returns DataPipes with elements from the first datapipe following by elements from second datapipes

Alternatives:

    `dp = dp + dp2`
    
    `dp = dp.concat(dp2, dp3)`

Example:


In [3]:
dp = ExampleIterPipe(4)
dp2 = ExampleIterPipe(3)
dp = dp.concat(dp2)
for i in dp:
    print(i)

0
1
2
3
0
1
2


## Batch

Function: `batch`

Description: 

Alternatives:

Arguments:
  - `batch_size: int` desired batch size
  - `batch_level: bool = False` whether elements from the source DataPipe counted as independant objects or we count mini-batched elements indivudually.
  - `drop_last: bool = False`

Example:

Classic batching produce partial batches by default


In [4]:
dp = ExampleIterPipe(10).batch(3)
for i in dp:
    print(i)

[0, 1, 2]
[3, 4, 5]
[6, 7, 8]
[9]


To drop incomplete batches add `drop_last` argument

In [5]:
dp = ExampleIterPipe(10).batch(3, drop_last=True)
for i in dp:
    print(i)

[0, 1, 2]
[3, 4, 5]
[6, 7, 8]


Without `batch_level` override sequential `batch` calls do rebatching.

In [6]:
dp = ExampleIterPipe(10).batch(3).batch(2)
for i in dp:
    print(i)

[0, 1]
[2, 3]
[4, 5]
[6, 7]
[8, 9]


Setting `batch_level=True` allows to take input lists as singular objects and nest them

In [7]:
dp = ExampleIterPipe(10).batch(3).batch(2, batch_level=True)
for i in dp:
    print(i)

[[0, 1, 2], [3, 4, 5]]
[[6, 7, 8], [9]]


## Map

Function: `map`

Description: 

Alternatives:

Arguments:
  - `batch_level: bool = False` whether elements from the source DataPipe counted as independant objects or we process mini-batched elements as whole.
 
Example:

In [8]:
dp = ExampleIterPipe(10).map(lambda x: x * 2)
for i in dp:
    print(i)

0
2
4
6
8
10
12
14
16
18


Classic `map` applies function to every element inside mini-batches by default


In [9]:
dp = ExampleIterPipe(10).batch(3).map(lambda x: x * 2)
for i in dp:
    print(i)

[0, 2, 4]
[6, 8, 10]
[12, 14, 16]
[18]


To apply function on entire mini-batch use `batch_level` argument

In [10]:
dp = ExampleIterPipe(10).batch(3).map(lambda x: x * 2, batch_level=True)
for i in dp:
    print(i)

[0, 1, 2, 0, 1, 2]
[3, 4, 5, 3, 4, 5]
[6, 7, 8, 6, 7, 8]
[9, 9]


## Filter

Function: `filter`

Description: 

Alternatives:

Arguments:
  - `batch_level: bool = False` whether elements from the source DataPipe counted as independant objects or we process mini-batched elements as whole.
  - `drop_empty_batches = True` whether empty many batches dropped or not.
 
Example:

In [11]:
dp = ExampleIterPipe(10).filter(lambda x: x % 2 == 0)
for i in dp:
    print(i)

0
2
4
6
8


Classic `filter` applies filter function to every element inside mini-batches by default


In [12]:
dp = ExampleIterPipe(10)
dp = dp.batch(3).filter(lambda x: x % 2 == 0)
for i in dp:
    print(i)

[0, 2]
[4]
[6, 8]


By default if mini-batch ends with zero elements after filtering it is dropped from response

In [13]:
dp = ExampleIterPipe(10)
dp = dp.batch(3).filter(lambda x: x > 4)
for i in dp:
    print(i)

[5]
[6, 7, 8]
[9]


You can override this behaviour using `drop_empty_batches` argument

In [14]:
dp = ExampleIterPipe(10)
dp = dp.batch(3).filter(lambda x: x > 4, drop_empty_batches=False)
for i in dp:
    print(i)

[]
[5]
[6, 7, 8]
[9]


You can also apply filtering function on top of entire mini-batch using `batch_level` agrument

In [15]:
dp = ExampleIterPipe(10)
dp = dp.batch(3).filter(lambda l: len(l) < 3, batch_level=True)
for i in dp:
    print(i)

[9]


## Shuffle

Function: `shuffle`

Description: 

Alternatives:

Arguments:
  - `batch_level: bool = False` whether elements from the source DataPipe counted as independant objects or we process mini-batched elements as whole.
  - `buffer_size: int = 10000`
  - `cross_shuffle: bool = True`
 
Example:

In [16]:
dp = ExampleIterPipe(10).shuffle()
for i in dp:
    print(i)

6
4
2
0
9
7
5
3
1
8


By default `shuffle` shuffles items inside of mini-batches across all input batches

In [17]:
dp = ExampleIterPipe(10).batch(3).shuffle()
for i in dp:
    print(i)

[0, 4, 1]
[9, 7, 8]
[2, 6, 5]
[3]


To shuffle items inside of mini-batches only, without passing mini-batch borders use `cross_shuffle` argument

In [18]:
dp = ExampleIterPipe(10).batch(3).shuffle(cross_shuffle = False)
for i in dp:
    print(i)

[0, 1, 2]
[5, 3, 4]
[8, 7, 6]
[9]


To shuffle items on mini-batch level (without changing internal order of items inside mini-batches) use `batch_level` argument

In [19]:
dp = ExampleIterPipe(10).batch(3).shuffle(batch_level = True)
for i in dp:
    print(i)

[3, 4, 5]
[9]
[0, 1, 2]
[6, 7, 8]


## Unbatch

Function: `unbatch`

Description: 

Alternatives:

Arguments:
 
Example:

In [20]:
dp = ExampleIterPipe(10).batch(3).shuffle().unbatch()
for i in dp:
    print(i)

4
9
6
5
7
8
2
3
1
0


## Collate

Function: `collate`

Description: 

Alternatives:

Arguments:
 
Example:

In [21]:
dp = ExampleIterPipe(10).batch(3).collate()
for i in dp:
    print(i)

tensor([0, 1, 2])
tensor([3, 4, 5])
tensor([6, 7, 8])
tensor([9])


## GroupBy

Function: `groupby`

Usage: `dp.groupby(lambda x: x[0])`

Description: Batching items by combining items with same key into same batch 

Arguments:
 - `group_key_fn`
 - `group_size` - yeild resulted group as soon as `group_size` elements accumulated
 - `guaranteed_group_size:int = None`
 - `batch_level`

#### Attention
As datasteam can be arbitrary large, grouping is done on best effort basis and there is no guarantee that same key will never present in the different groups.

In [22]:
dp = ExampleIterPipe(10).map(lambda x: (x % 3, x)).shuffle().groupby(lambda x: x[0])
for i in dp:
    print(i)

[(0, 0), (0, 9), (0, 6), (0, 3)]
[(1, 4), (1, 7), (1, 1)]
[(2, 8), (2, 2), (2, 5)]


By default grouping is applied individually on each item of input mini-batches

In [23]:
dp = ExampleIterPipe(15).map(lambda x: (x % 3, x)).shuffle().groupby(lambda x: x[0])
for i in dp:
    print(i)

[(1, 4), (1, 1), (1, 10), (1, 7), (1, 13)]
[(2, 2), (2, 5), (2, 8), (2, 14), (2, 11)]
[(0, 12), (0, 9), (0, 0), (0, 6), (0, 3)]


Group key function gets entire mini-batch as input if `batch_level` is True

In [24]:
dp = ExampleIterPipe(10).batch(3).groupby(lambda x: len(x), batch_level = True)
for i in dp:
    print(i)

[[0, 1, 2], [3, 4, 5], [6, 7, 8]]
[[9]]


When internal buffer (defined by `buffer_size`) is overfilled, groupby will yield biggest group available

In [25]:
dp = ExampleIterPipe(15).map(lambda x: (x % 3, x)).shuffle().groupby(lambda x: x[0], buffer_size = 5)
for i in dp:
    print(i)

[(1, 10), (1, 1)]
[(2, 5), (2, 2), (2, 8)]
[(0, 6), (0, 3), (0, 0)]
[(1, 13), (1, 7)]
[(2, 14), (2, 11)]
[(0, 9), (0, 12)]
[(1, 4)]


`groupby` will produce `group_size` sized batches on as fast as possible basis

In [26]:
dp = ExampleIterPipe(9).map(lambda x: (x % 3, x)).shuffle().groupby(lambda x: x[0], group_size = 3)
for i in dp:
    print(i)

[(1, 1), (1, 4), (1, 7)]
[(2, 2), (2, 5), (2, 8)]
[(0, 3), (0, 0), (0, 6)]


Remaining groups must be at least `guaranteed_group_size` big. 

In [27]:
dp = ExampleIterPipe(15).map(lambda x: (x % 3, x)).shuffle().groupby(lambda x: x[0], group_size = 3, guaranteed_group_size = 2)
for i in dp:
    print(i)

[(1, 13), (1, 10), (1, 1)]
[(0, 12), (0, 9), (0, 3)]
[(2, 14), (2, 11), (2, 8)]
[(0, 6), (0, 0)]
[(1, 4), (1, 7)]
[(2, 2), (2, 5)]


Without defined `group_size` function will try to accumulate at least `guaranteed_group_size` elements before yielding resulted group

In [28]:
dp = ExampleIterPipe(15).map(lambda x: (x % 3, x)).shuffle().groupby(lambda x: x[0], guaranteed_group_size = 2)
for i in dp:
    print(i)

[(0, 6), (0, 0), (0, 12), (0, 3), (0, 9)]
[(2, 5), (2, 11), (2, 8), (2, 14), (2, 2)]
[(1, 1), (1, 4), (1, 10), (1, 7), (1, 13)]


This behaviour becomes noticable when data is bigger than buffer and some groups getting evicted before gathering all potential items

In [29]:
dp = ExampleIterPipe(15).map(lambda x: (x % 3, x)).groupby(lambda x: x[0], guaranteed_group_size = 2, buffer_size = 6)
for i in dp:
    print(i)

[(0, 0), (0, 3)]
[(1, 1), (1, 4), (1, 7)]
[(2, 2), (2, 5), (2, 8)]
[(0, 6), (0, 9), (0, 12)]
[(1, 10), (1, 13)]
[(2, 11), (2, 14)]


With randomness involved you might end up with incomplete groups (so next example expected to fail in most cases)

In [30]:
dp = ExampleIterPipe(15).map(lambda x: (x % 3, x)).shuffle().groupby(lambda x: x[0], guaranteed_group_size = 2, buffer_size = 6)
for i in dp:
    print(i)

[(2, 8), (2, 2), (2, 14)]
[(1, 7), (1, 13), (1, 1)]
[(0, 9), (0, 12), (0, 6), (0, 3)]
[(2, 11), (2, 5)]
[(1, 10), (1, 4)]


Exception: ('Failed to group items', '[(0, 0)]')

To avoid this error and drop incomplete groups, use `drop_remaining` argument

In [47]:
dp = ExampleIterPipe(15).map(lambda x: (x % 3, x)).shuffle().groupby(lambda x: x[0], guaranteed_group_size = 2, buffer_size = 6, drop_remaining = True)
for i in dp:
    print(i)

[(2, 5), (2, 11), (2, 14), (2, 2)]
[(1, 7), (1, 10), (1, 13), (1, 4), (1, 1)]
[(0, 9), (0, 0), (0, 6), (0, 3), (0, 12)]


## Zip

Function: `zip`

Description: 

Alternatives:

Arguments:
 
Example:

In [48]:
_dp = ExampleIterPipe(5).shuffle()
dp = ExampleIterPipe(5).zip(_dp)
for i in dp:
    print(i)

(0, 0)
(1, 3)
(2, 1)
(3, 2)
(4, 4)


## Fork

Function: `fork`

Description: 

Alternatives:

Arguments:
 
Example:

In [49]:
dp = ExampleIterPipe(2)
dp1, dp2, dp3 = dp.fork(3)
for i in dp1 + dp2 + dp3:
    print(i)

0
1
0
1
0
1
