/
truncate.py
135 lines (107 loc) · 3.89 KB
/
truncate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# -*- coding: utf-8 -*-
# vim: sw=4:ts=4:expandtab
"""
riko.modules.truncate
~~~~~~~~~~~~~~~~~~~~~
Provides functions for returning a specified number of items from a stream.
Contrast this with the tail module, which also limits the number of items,
but returns items from the bottom of the stream.
Examples:
basic usage::
>>> from riko.modules.truncate import pipe
>>>
>>> items = ({'x': x} for x in range(5))
>>> len(list(pipe(items, conf={'count': '4'})))
4
Attributes:
OPTS (dict): The default pipe options
DEFAULTS (dict): The default parser options
"""
from itertools import islice
from . import operator
import pygogo as gogo
OPTS = {"ptype": "int"}
DEFAULTS = {"start": 0}
logger = gogo.Gogo(__name__, monolog=True).logger
def parser(stream, objconf, tuples, **kwargs):
"""Parses the pipe content
Args:
stream (Iter[dict]): The source. Note: this shares the `tuples`
iterator, so consuming it will consume `tuples` as well.
objconf (obj): the item independent configuration (an Objectify
instance).
tuples (Iter[(dict, obj)]): Iterable of tuples of (item, objconf)
`item` is an element in the source stream and `objconf` is the item
configuration (an Objectify instance). Note: this shares the
`stream` iterator, so consuming it will consume `stream` as well.
kwargs (dict): Keyword arguments.
Returns:
Iter(dict): The output stream
Examples:
>>> from meza.fntools import Objectify
>>> from itertools import repeat
>>>
>>> kwargs = {'count': 4, 'start': 0}
>>> objconf = Objectify(kwargs)
>>> stream = ({'x': x} for x in range(5))
>>> tuples = zip(stream, repeat(objconf))
>>> len(list(parser(stream, objconf, tuples, **kwargs)))
4
"""
start = objconf.start
stop = start + objconf.count
return islice(stream, start, stop)
@operator(DEFAULTS, isasync=True, **OPTS)
def async_pipe(*args, **kwargs):
"""An operator that asynchronously returns a specified number of items
from a stream.
Args:
items (Iter[dict]): The source.
kwargs (dict): The keyword arguments passed to the wrapper
Kwargs:
conf (dict): The pipe configuration. Must contain the key 'count'.
May contain the key 'start'.
count (int): desired stream length
start (int): starting location (default: 0)
Returns:
Deferred: twisted.internet.defer.Deferred truncated stream
Examples:
>>> from riko.bado import react
>>> from riko.bado.mock import FakeReactor
>>>
>>> def run(reactor):
... callback = lambda x: print(len(list(x)))
... items = ({'x': x} for x in range(5))
... d = async_pipe(items, conf={'count': 4})
... return d.addCallbacks(callback, logger.error)
>>>
>>> try:
... react(run, _reactor=FakeReactor())
... except SystemExit:
... pass
...
4
"""
return parser(*args, **kwargs)
@operator(DEFAULTS, **OPTS)
def pipe(*args, **kwargs):
"""An operator that returns a specified number of items from a stream.
Args:
items (Iter[dict]): The source.
kwargs (dict): The keyword arguments passed to the wrapper
Kwargs:
conf (dict): The pipe configuration. Must contain the key 'count'.
May contain the key 'start'.
start (int): starting location (default: 0)
count (int): desired stream length
Yields:
dict: an item
Examples:
>>> items = [{'x': x} for x in range(5)]
>>> len(list(pipe(items, conf={'count': '4'})))
4
>>> stream = pipe(items, conf={'count': '2', 'start': '2'})
>>> next(stream) == {'x': 2}
True
"""
return parser(*args, **kwargs)