-
Notifications
You must be signed in to change notification settings - Fork 154
/
cborsiterators.py
51 lines (37 loc) · 1.24 KB
/
cborsiterators.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from . import filters
from .handlers import reraise_exception
from .tariterators import url_opener
def cbor_iterator(data, handler=reraise_exception, info={}):
import cbor
for source in data:
assert isinstance(source, dict)
assert "stream" in source
stream = source["stream"]
try:
while True:
sample = cbor.load(stream)
yield sample
except EOFError:
return None
def cbors_samples(src, handler=reraise_exception):
streams = url_opener(src, handler=handler)
samples = cbor_iterator(streams)
return samples
cbors_to_samples = filters.pipelinefilter(cbors_samples)
def cbor2_iterator(data, handler=reraise_exception, info={}):
import cbor2
for source in data:
assert isinstance(source, dict)
assert "stream" in source
stream = source["stream"]
try:
while True:
sample = cbor2.load(stream)
yield sample
except EOFError:
return None
def cbors2_samples(src, handler=reraise_exception):
streams = url_opener(src, handler=handler)
samples = cbor_iterator(streams)
return samples
cbors2_to_samples = filters.pipelinefilter(cbors_samples)