/
split.py
134 lines (104 loc) · 3.87 KB
/
split.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# -*- coding: utf-8 -*-
# vim: sw=4:ts=4:expandtab
"""
riko.modules.split
~~~~~~~~~~~~~~~~~~
Provides functions for splitting a stream into identical copies
Use split when you want to perform different operations on data from the same
stream. The Union module is the reverse of Split, it merges multiple input
streams into a single combined stream.
Examples:
basic usage::
>>> from riko.modules.split import pipe
>>>
>>> stream1, stream2 = pipe({'x': x} for x in range(5))
>>> next(stream1) == {'x': 0}
True
Attributes:
OPTS (dict): The default pipe options
DEFAULTS (dict): The default parser options
"""
from copy import deepcopy
from . import operator
import pygogo as gogo
OPTS = {"extract": "splits", "ptype": "int", "objectify": False}
DEFAULTS = {"splits": 2}
logger = gogo.Gogo(__name__, monolog=True).logger
def parser(stream, splits, tuples, **kwargs):
"""Parses the pipe content
Args:
stream (Iter[dict]): The source stream. Note: this shares the `tuples`
iterator, so consuming it will consume `tuples` as well.
splits (int): the number of copies to create.
tuples (Iter[(dict, obj)]): Iterable of tuples of (item, splits)
`item` is an element in the source stream (a DotDict instance)
and `splits` is an int. Note: this shares the `stream` iterator,
so consuming it will consume `stream` as well.
kwargs (dict): Keyword arguments.
Yields:
Iter(dict): a stream of items
Examples:
>>> from itertools import repeat
>>>
>>> conf = {'splits': 3}
>>> kwargs = {'conf': conf}
>>> stream = (({'x': x}) for x in range(5))
>>> tuples = zip(stream, repeat(conf['splits']))
>>> streams = parser(stream, conf['splits'], tuples, **kwargs)
>>> next(next(streams)) == {'x': 0}
True
"""
source = list(stream)
# deepcopy each item so that each split is independent
for num in range(splits):
yield map(deepcopy, source)
@operator(DEFAULTS, isasync=True, **OPTS)
def async_pipe(*args, **kwargs):
"""An operator that asynchronously and eagerly splits a stream into identical
copies. Note that this pipe is not lazy.
Args:
items (Iter[dict]): The source stream.
kwargs (dict): The keyword arguments passed to the wrapper
Kwargs:
conf (dict): The pipe configuration. May contain the key 'splits'.
splits (int): the number of copies to create (default: 2).
Returns:
Deferred: twisted.internet.defer.Deferred iterable of streams
Examples:
>>> from riko.bado import react
>>> from riko.bado.mock import FakeReactor
>>>
>>> def run(reactor):
... callback = lambda x: print(next(next(x)) == {'x': 0})
... d = async_pipe({'x': x} for x in range(5))
... return d.addCallbacks(callback, logger.error)
>>>
>>> try:
... react(run, _reactor=FakeReactor())
... except SystemExit:
... pass
...
True
"""
return parser(*args, **kwargs)
@operator(DEFAULTS, **OPTS)
def pipe(*args, **kwargs):
"""An operator that eagerly splits a stream into identical copies.
Note that this pipe is not lazy.
Args:
items (Iter[dict]): The source stream.
kwargs (dict): The keyword arguments passed to the wrapper
Kwargs:
conf (dict): The pipe configuration. May contain the key 'splits'.
splits (int): the number of copies to create (default: 2).
Yields:
Iter(dict): a stream of items
Examples:
>>> items = [{'x': x} for x in range(5)]
>>> stream1, stream2 = pipe(items)
>>> next(stream1) == {'x': 0}
True
>>> len(list(pipe(items, conf={'splits': '3'})))
3
"""
return parser(*args, **kwargs)