Finite Sequences
======

In [1]:
import json

In [2]:
data = ['{"name": "Alice", "value": 1}',
        '{"name": "Bob", "value": 2}',
        '{"name": "Alice", "value": 3}',
        '{"name": "Alice", "value": 4}',
        '{"name": "Charlie", "value": 5}',
        '{"name": "Bob", "value": 6}',
        '{"name": "Alice", "value": 7}']

In [3]:
seq = list(map(json.loads, data))
seq

[{'name': 'Alice', 'value': 1},
 {'name': 'Bob', 'value': 2},
 {'name': 'Alice', 'value': 3},
 {'name': 'Alice', 'value': 4},
 {'name': 'Charlie', 'value': 5},
 {'name': 'Bob', 'value': 6},
 {'name': 'Alice', 'value': 7}]

In [4]:
import toolz
seq = list(toolz.pluck('value', seq))
seq

[1, 2, 3, 4, 5, 6, 7]

In [5]:
sum(seq)

28

Infinite Sequences
==========

In [6]:
def infinite_data():
    for x in data:
        yield x
        
    # Here we stop, but we could keep going forever...
    raise StopIteration

In [7]:
from operator import add
seq = infinite_data()
seq = map(json.loads, seq)
seq = toolz.pluck('value', seq)
seq = toolz.accumulate(add, seq)

In [8]:
for item in seq:
    print(item)

1
3
6
10
15
21
28


Streams
=====

Same applications, just a different way of thinking about controlling data.

In [10]:
from streams import Stream

In [11]:
L = []

In [12]:
source = Stream()
stream = (source.map(json.loads)
                .map(lambda x: x['value'])
                .scan(add))
stream.sink(print)
stream.sink(L.append)

<streams.core.Sink at 0x7fc06cb8f630>

In [13]:
for line in data:
    source.emit(line)

3
6
10
15
21
28


In [14]:
L

[3, 6, 10, 15, 21, 28]

In [15]:
source.emit('{"name": "Charlie", "value": 100}');

128


In [16]:
L

[3, 6, 10, 15, 21, 28, 128]

Dask
====

In [17]:
from dask.distributed import Client
client = Client(processes=False)

In [18]:
source = Stream()
s = (source.to_dask()
           .scatter()
           .map(json.loads)
           .map(lambda x: x['value'])
           .scan(add)
           .gather())
s.sink(print)

<streams.core.Sink at 0x7fc048955240>

In [19]:
for line in data:
    source.emit(line)

3
6
10
15
21
28


In [20]:
for i in range(10):
    source.emit('{"name": "Charlie", "value": 100}')

128
228
328
428
528
628
728
828
928
1028


In [21]:
from tornado import gen
@gen.coroutine
def f():
    for i in range(10):
        yield source.emit('{"name": "Charlie", "value": 100}')  # waits until sinks are ready
        
client.loop.add_callback(f)

1128
1228
1328
1428
1528
1628
1728
1828
1928
2028


Questions
======

1.  Do we have current or upcoming use cases for this that we could use to drive development?
2.  Do we know what standard practice is today?
3.  Do we know Python users that are likely to give this a shot and provide feedback?
