In [1]:
import sys
sys.path.append('../')

# Combination operations

Previously we have seen that channels can act as versatile conduits for the flow of data in our examples of the fan-in and fan-out pattern. Here we discuss some convenient functions and constructs for dealing with more complicated patterns for the combination of data.

## Merging values

In fan-in, we passed the channels to all parties interested in producing data. Often we find that these producers provide their own output channels instead. In this case, we can `merge` these channels into a single one:

In [2]:
import aiochan as ac
import asyncio

async def main():
    p1 = ac.from_range(10).map(lambda x: 'p1-' + str(x))
    p2 = ac.from_range(10).map(lambda x: 'p2-' + str(x))
    out = ac.merge(p1, p2)
    print(await out.collect())
    
ac.run(main())

['p1-0', 'p2-0', 'p2-1', 'p1-1', 'p1-2', 'p2-2', 'p1-3', 'p2-3', 'p1-4', 'p2-4', 'p1-5', 'p2-5', 'p2-6', 'p1-6', 'p1-7', 'p2-7', 'p1-8', 'p2-8', 'p1-9', 'p2-9']


As in manual fan-in, the order of the values are somewhat non-deterministic. If you want your channels to produce values in-sync, you want to use `zip_chans`:

In [3]:
async def main():
    p1 = ac.from_range(10).map(lambda x: 'p1-' + str(x))
    p2 = ac.from_range(10).map(lambda x: 'p2-' + str(x))
    out = ac.zip_chans(p1, p2)
    print(await out.collect())
    
ac.run(main())

[['p1-0', 'p2-0'], ['p1-1', 'p2-1'], ['p1-2', 'p2-2'], ['p1-3', 'p2-3'], ['p1-4', 'p2-4'], ['p1-5', 'p2-5'], ['p1-6', 'p2-6'], ['p1-7', 'p2-7'], ['p1-8', 'p2-8'], ['p1-9', 'p2-9']]


A even more complicated use case is that your producer produces items at different rates, and you want to keep track of the *latest* values produced by each of them:

In [4]:
async def main():
    p1 = ac.from_range(10).map(lambda x: 'p1-' + str(x))
    p2 = ac.from_range(10).map(lambda x: 'p2-' + str(x))
    out = ac.combine_latest(p1, p2)
    print(await out.collect())
    
ac.run(main())

[['p1-0', None], ['p1-0', 'p2-0'], ['p1-0', 'p2-1'], ['p1-1', 'p2-1'], ['p1-2', 'p2-1'], ['p1-2', 'p2-2'], ['p1-2', 'p2-3'], ['p1-3', 'p2-3'], ['p1-3', 'p2-4'], ['p1-4', 'p2-4'], ['p1-5', 'p2-4'], ['p1-5', 'p2-5'], ['p1-5', 'p2-6'], ['p1-6', 'p2-6'], ['p1-6', 'p2-7'], ['p1-7', 'p2-7'], ['p1-7', 'p2-8'], ['p1-8', 'p2-8'], ['p1-8', 'p2-9'], ['p1-9', 'p2-9']]


Notice for channels that has yet to produce a value, `None` is put in its place.

As for the functional methods, all of these functions take optional `out` and `close` arguments, controlling the output channel and whether to close the output channel when there is nothing more to be done (i.e., when all source channels are closed).

It is rare, but sometimes that you will need even more control: you want to *dynamically* control which channels are producing data *at each instance*. In this case you can use a *multiplexer*, or the `aiochan.Mux` class. As its use is fairly advanced, we refer to the interested readers to the class documentation.

To recap:

* use `merge` to combine values from several channels
* use `zip_chans` to combine values from several channels in sync
* use `combine_latest` to combine values and monitor the latest value from each channel
* `Mux` is available for advanced merging tasks

## Distributing values

We have discussed generalizations of fan-in above. For simple fan-out, we have `distribute`:

In [5]:
async def worker(inp, tag):
    async for v in inp:
        print('%s received %s' % (tag, v))

async def main():
    inputs = [ac.Chan(name='inp%s' % i) for i in range(3)]
    ac.from_range(20).distribute(*inputs)
    for idx, c in enumerate(inputs):
        ac.go(worker(c, 'worker%s' % idx))
    await asyncio.sleep(0.1)
    
ac.run(main())

worker0 received 0
worker2 received 1
worker2 received 4
worker0 received 2
worker1 received 3
worker0 received 5
worker0 received 8
worker1 received 6
worker2 received 7
worker2 received 9
worker2 received 12
worker0 received 10
worker1 received 11
worker2 received 13
worker2 received 16
worker1 received 14
worker0 received 15
worker1 received 17
worker2 received 18
worker0 received 19


One of the benefit of using `distribute` instead of a plain fan-out is that, in the case one of the down-stream channels are closed, `distribute` will try to put the value into another downstream channel so that no values would be lost.

In fan-out and `distribute`, each down-stream consumer obtains non-overlapping parts of the input. Sometimes we want each consumer to consume the whole input instead. So in this case, we want a duplicator, or `Dup` in aiochan. An example:

In [6]:
async def worker(inp, tag):
    async for v in inp:
        print('%s received %s' % (tag, v))
        
async def main():
    dup = ac.from_range(5).dup()
    inputs = [dup.tap() for i in range(3)]

    for idx, c in enumerate(inputs):
        ac.go(worker(c, 'worker%s' % idx))
    await asyncio.sleep(0.1)
    
ac.run(main())

worker0 received 0
worker1 received 0
worker1 received 1
worker2 received 0
worker0 received 1
worker2 received 1
worker2 received 2
worker0 received 2
worker1 received 2
worker0 received 3
worker0 received 4
worker1 received 3
worker2 received 3
worker1 received 4
worker2 received 4


Note that duplicated elements are put to downstream channels in order. This means that if any one of the downstream channels block on put for some reason, the whole progress will be blocked. You should consider giving downstream inputs some buffer if your downstream processors are uneven in their processing speed.

A `Dup` also has the method `untap`, which can be used to untap an existing tapping channel. For more information, please read the documentation for `Dup`.

Another very common idiom is pub-sub, and this is easy to do as well:

In [7]:
async def processor(inp, tag):
    async for v in inp:
        print('%s received %s' % (tag, v))

async def main():
    source = ac.Chan()
    pub = source.pub(lambda x: x % 3)
    p1 = pub.sub(1)
    p2 = pub.sub(2)
    p0 = pub.sub(0)
    px = pub.sub(0)
    ac.go(processor(p1, 'p1'))
    ac.go(processor(p2, 'p2'))
    ac.go(processor(p0, 'p0'))
    ac.go(processor(px, 'px'))
    source.add(0,1,2,3,4,5,6,7,8,9)
    await asyncio.sleep(0.1)
    
ac.run(main())

p0 received 0
px received 0
px received 3
p0 received 3
p1 received 1
p2 received 2
p0 received 6
px received 6
p1 received 4
p2 received 5
p1 received 7
p2 received 8
p0 received 9
px received 9


In this case, the topic is defined by the lambda, which gives the remainder when the item is divided by three. Processors subscribe to the topics they are intrested in, and we see that `p0` and `px` received all numbers with remainder 0, `p1` all numbers with remainder 1, and `p2` all numbers with remainder 2.

A `Pub` also has a method `unsub`, which can be used to unsubscribe a currently subscribing channel. There is also `unsub_all`, which can unsubscribe a whole topic in one go. For more information, please read the documentation for `Pub`.

As before, `tap` and `sub` methods all take `out` and `close` arguments that have their usual meaning.

To recap:

* use `distribute` to distribute values to downstream channels
* use `dup` to duplicate values
* use `pub` for publisher-subscriber systems