forked from dask/distributed
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_utils_comm.py
27 lines (19 loc) · 853 Bytes
/
test_utils_comm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from __future__ import print_function, division, absolute_import
import pytest
from distributed.core import rpc
from distributed.utils_test import gen_cluster
from distributed.utils_comm import pack_data, gather_from_workers
def test_pack_data():
data = {"x": 1}
assert pack_data(("x", "y"), data) == (1, "y")
assert pack_data({"a": "x", "b": "y"}, data) == {"a": 1, "b": "y"}
assert pack_data({"a": ["x"], "b": "y"}, data) == {"a": [1], "b": "y"}
@pytest.mark.xfail(reason="rpc now needs to be a connection pool")
@gen_cluster(client=True)
def test_gather_from_workers_permissive(c, s, a, b):
x = yield c.scatter({"x": 1}, workers=a.address)
data, missing, bad_workers = yield gather_from_workers(
{"x": [a.address], "y": [b.address]}, rpc=rpc
)
assert data == {"x": 1}
assert list(missing) == ["y"]