-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathtest_functions.py
42 lines (34 loc) · 1.62 KB
/
test_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import datetime
import unittest
from typing import Callable, Iterator, List, TypeVar, cast
from streamable.functions import catch, flatten, group, map, observe, throttle, truncate
T = TypeVar("T")
# size of the test collections
N = 256
src = range(N)
class TestFunctions(unittest.TestCase):
def test_signatures(self) -> None:
iterator = iter(src)
transformation = cast(Callable[[int], int], ...)
mapped_it_1: Iterator[int] = map(transformation, iterator)
mapped_it_2: Iterator[int] = map(transformation, iterator, concurrency=1)
mapped_it_3: Iterator[int] = map(transformation, iterator, concurrency=2)
grouped_it_1: Iterator[List[int]] = group(iterator, size=1)
grouped_it_2: Iterator[List[int]] = group(
iterator, size=1, interval=datetime.timedelta(seconds=0.1)
)
grouped_it_3: Iterator[List[int]] = group(
iterator, size=1, interval=datetime.timedelta(seconds=2)
)
flattened_grouped_it_1: Iterator[int] = flatten(grouped_it_1)
flattened_grouped_it_2: Iterator[int] = flatten(grouped_it_1, concurrency=1)
flattened_grouped_it_3: Iterator[int] = flatten(grouped_it_1, concurrency=2)
caught_it_1: Iterator[int] = catch(iterator, Exception)
caught_it_2: Iterator[int] = catch(iterator, Exception, finally_raise=True)
observed_it_1: Iterator[int] = observe(iterator, what="objects")
throttleed_it_1: Iterator[int] = throttle(
iterator,
1,
per=datetime.timedelta(seconds=1),
)
truncated_it_1: Iterator[int] = truncate(iterator, count=1)