Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test failure in test_serializable_locks #4281

Open
ArchangeGabriel opened this issue Jul 28, 2020 · 2 comments
Open

Test failure in test_serializable_locks #4281

ArchangeGabriel opened this issue Jul 28, 2020 · 2 comments

Comments

@ArchangeGabriel
Copy link
Contributor

What happened: The mentioned test failed.

What you expected to happen: It should work?

Minimal Complete Verifiable Example:

python setup.py build
pytest

Anything else we need to know?:

Full excerpt:

___________________________ test_serializable_locks ____________________________

    def test_func():
        result = None
        workers = []
        with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop:
    
            async def coro():
                with dask.config.set(config):
                    s = False
                    for i in range(5):
                        try:
                            s, ws = await start_cluster(
                                nthreads,
                                scheduler,
                                loop,
                                security=security,
                                Worker=Worker,
                                scheduler_kwargs=scheduler_kwargs,
                                worker_kwargs=worker_kwargs,
                            )
                        except Exception as e:
                            logger.error(
                                "Failed to start gen_cluster, retrying",
                                exc_info=True,
                            )
                            await asyncio.sleep(1)
                        else:
                            workers[:] = ws
                            args = [s] + workers
                            break
                    if s is False:
                        raise Exception("Could not start cluster")
                    if client:
                        c = await Client(
                            s.address,
                            loop=loop,
                            security=security,
                            asynchronous=True,
                            **client_kwargs,
                        )
                        args = [c] + args
                    try:
                        future = func(*args)
                        if timeout:
                            future = asyncio.wait_for(future, timeout)
                        result = await future
                        if s.validate:
                            s.validate_state()
                    finally:
                        if client and c.status not in ("closing", "closed"):
                            await c._close(fast=s.status == "closed")
                        await end_cluster(s, workers)
                        await asyncio.wait_for(cleanup_global_workers(), 1)
    
                    try:
                        c = await default_client()
                    except ValueError:
                        pass
                    else:
                        await c._close(fast=True)
    
                    def get_unclosed():
                        return [c for c in Comm._instances if not c.closed()] + [
                            c
                            for c in _global_clients.values()
                            if c.status != "closed"
                        ]
    
                    try:
                        start = time()
                        while time() < start + 5:
                            gc.collect()
                            if not get_unclosed():
                                break
                            await asyncio.sleep(0.05)
                        else:
                            if allow_unclosed:
                                print(f"Unclosed Comms: {get_unclosed()}")
                            else:
                                raise RuntimeError("Unclosed Comms", get_unclosed())
                    finally:
                        Comm._instances.clear()
                        _global_clients.clear()
    
                    return result
    
>           result = loop.run_sync(
                coro, timeout=timeout * 2 if timeout else timeout
            )

/usr/lib/python3.8/site-packages/distributed/utils_test.py:953: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/usr/lib/python3.8/site-packages/tornado/ioloop.py:532: in run_sync
    return future_cell[0].result()
/usr/lib/python3.8/site-packages/distributed/utils_test.py:912: in coro
    result = await future
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

fut = <Task cancelled name='Task-701' coro=<test_serializable_locks() done, defined at /build/python-xarray/src/xarray-0.16.0/xarray/tests/test_distributed.py:220>>
timeout = 10

    async def wait_for(fut, timeout, *, loop=None):
        """Wait for the single Future or coroutine to complete, with timeout.
    
        Coroutine will be wrapped in Task.
    
        Returns result of the Future or coroutine.  When a timeout occurs,
        it cancels the task and raises TimeoutError.  To avoid the task
        cancellation, wrap it in shield().
    
        If the wait is cancelled, the task is also cancelled.
    
        This function is a coroutine.
        """
        if loop is None:
            loop = events.get_running_loop()
        else:
            warnings.warn("The loop argument is deprecated since Python 3.8, "
                          "and scheduled for removal in Python 3.10.",
                          DeprecationWarning, stacklevel=2)
    
        if timeout is None:
            return await fut
    
        if timeout <= 0:
            fut = ensure_future(fut, loop=loop)
    
            if fut.done():
                return fut.result()
    
            fut.cancel()
            raise exceptions.TimeoutError()
    
        waiter = loop.create_future()
        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
        cb = functools.partial(_release_waiter, waiter)
    
        fut = ensure_future(fut, loop=loop)
        fut.add_done_callback(cb)
    
        try:
            # wait until the future completes or the timeout
            try:
                await waiter
            except exceptions.CancelledError:
                fut.remove_done_callback(cb)
                fut.cancel()
                raise
    
            if fut.done():
                return fut.result()
            else:
                fut.remove_done_callback(cb)
                # We must ensure that the task is not running
                # after wait_for() returns.
                # See https://bugs.python.org/issue32751
                await _cancel_and_wait(fut, loop=loop)
>               raise exceptions.TimeoutError()
E               asyncio.exceptions.TimeoutError

/usr/lib/python3.8/asyncio/tasks.py:490: TimeoutError
----------------------------- Captured stderr call -----------------------------
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:40483
distributed.scheduler - INFO -   dashboard at:            127.0.0.1:8787
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:45589
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:45589
distributed.worker - INFO -          dashboard at:            127.0.0.1:43247
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:40483
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                  135.05 GB
distributed.worker - INFO -       Local Directory: /build/python-xarray/src/xarray-0.16.0/dask-worker-space/worker-nbb_tie4
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:40153
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:40153
distributed.worker - INFO -          dashboard at:            127.0.0.1:33037
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:40483
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          2
distributed.worker - INFO -                Memory:                  135.05 GB
distributed.worker - INFO -       Local Directory: /build/python-xarray/src/xarray-0.16.0/dask-worker-space/worker-qyb14fl2
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register worker <Worker 'tcp://127.0.0.1:45589', name: 0, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:45589
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <Worker 'tcp://127.0.0.1:40153', name: 1, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:40153
distributed.core - INFO - Starting established connection
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:40483
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:40483
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-01142bc3-d0d0-11ea-84a5-6cb311234570
distributed.core - INFO - Starting established connection
distributed.protocol.pickle - INFO - Failed to serialize (<function apply at 0x7f8c2cb36940>, <function test_serializable_locks.<locals>.f at 0x7f8c1da0e040>, (<class 'tuple'>, [0]), {'lock': <SerializableLock: 789a59f0-0049-40aa-a0ed-0f2fe6c213a8>}). Exception: dumps() got an unexpected keyword argument 'buffer_callback'
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/usr/lib/python3.8/site-packages/distributed/protocol/core.py", line 36, in dumps
    data = {
  File "/usr/lib/python3.8/site-packages/distributed/protocol/core.py", line 37, in <dictcomp>
    key: serialize(
  File "/usr/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 244, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function apply at 0x7f8c2cb36940>, <function test_serializable_locks.<locals>.f at 0x7f8c1da0e040>, (<class 'tuple'>, [0]), {'lock': <SerializableLock: 789a59f0-0049-40aa-a0ed-0f2fe6c213a8>})")
distributed.comm.utils - INFO - Unserializable Message: [{'op': 'update-graph', 'tasks': {'f-e71c719fc7843a71d9b6ed77ba5c1139': <Serialize: (<function apply at 0x7f8c2cb36940>, <function test_serializable_locks.<locals>.f at 0x7f8c1da0e040>, (<class 'tuple'>, [0]), {'lock': <SerializableLock: 789a59f0-0049-40aa-a0ed-0f2fe6c213a8>})>, 'f-3e2e39666f6b20c1585ae513bd6c58a1': <Serialize: (<function apply at 0x7f8c2cb36940>, <function test_serializable_locks.<locals>.f at 0x7f8c1da0e040>, (<class 'tuple'>, [1]), {'lock': <SerializableLock: 789a59f0-0049-40aa-a0ed-0f2fe6c213a8>})>, 'f-92de0310d2e920306d74497bcdbd21f1': <Serialize: (<function apply at 0x7f8c2cb36940>, <function test_serializable_locks.<locals>.f at 0x7f8c1da0e040>, (<class 'tuple'>, [2]), {'lock': <SerializableLock: 789a59f0-0049-40aa-a0ed-0f2fe6c213a8>})>, 'f-f210a433c90cbabd4a839e5957c2ef7c': <Serialize: (<function apply at 0x7f8c2cb36940>, <function test_serializable_locks.<locals>.f at 0x7f8c1da0e040>, (<class 'tuple'>, [3]), {'lock': <SerializableLock: 789a59f0-0049-40aa-a0ed-0f2fe6c213a8>})>, 'f-5ecad7d214286d3d4cb2c0f51d71596c': <Serialize: (<function apply at 0x7f8c2cb36940>, <function test_serializable_locks.<locals>.f at 0x7f8c1da0e040>, (<class 'tuple'>, [4]), {'lock': <SerializableLock: 789a59f0-0049-40aa-a0ed-0f2fe6c213a8>})>, 'f-662771094d226039899ae3b5f21c6184': <Serialize: (<function apply at 0x7f8c2cb36940>, <function test_serializable_locks.<locals>.f at 0x7f8c1da0e040>, (<class 'tuple'>, [5]), {'lock': <SerializableLock: 789a59f0-0049-40aa-a0ed-0f2fe6c213a8>})>, 'f-6aa952faad415c3c2dab888f1e97135e': <Serialize: (<function apply at 0x7f8c2cb36940>, <function test_serializable_locks.<locals>.f at 0x7f8c1da0e040>, (<class 'tuple'>, [6]), {'lock': <SerializableLock: 789a59f0-0049-40aa-a0ed-0f2fe6c213a8>})>, 'f-c5a149b0b56aa271ac0e5456fcbd0c7c': <Serialize: (<function apply at 0x7f8c2cb36940>, <function test_serializable_locks.<locals>.f at 0x7f8c1da0e040>, (<class 'tuple'>, [7]), {'lock': <SerializableLock: 789a59f0-0049-40aa-a0ed-0f2fe6c213a8>})>, 'f-2d7b671f31682ff35e7b351af7c5ca2b': <Serialize: (<function apply at 0x7f8c2cb36940>, <function test_serializable_locks.<locals>.f at 0x7f8c1da0e040>, (<class 'tuple'>, [8]), {'lock': <SerializableLock: 789a59f0-0049-40aa-a0ed-0f2fe6c213a8>})>, 'f-0f7fc14155cfae899da1d79302c723c1': <Serialize: (<function apply at 0x7f8c2cb36940>, <function test_serializable_locks.<locals>.f at 0x7f8c1da0e040>, (<class 'tuple'>, [9]), {'lock': <SerializableLock: 789a59f0-0049-40aa-a0ed-0f2fe6c213a8>})>}, 'dependencies': {}, 'keys': ['f-e71c719fc7843a71d9b6ed77ba5c1139', 'f-3e2e39666f6b20c1585ae513bd6c58a1', 'f-92de0310d2e920306d74497bcdbd21f1', 'f-f210a433c90cbabd4a839e5957c2ef7c', 'f-5ecad7d214286d3d4cb2c0f51d71596c', 'f-662771094d226039899ae3b5f21c6184', 'f-6aa952faad415c3c2dab888f1e97135e', 'f-c5a149b0b56aa271ac0e5456fcbd0c7c', 'f-2d7b671f31682ff35e7b351af7c5ca2b', 'f-0f7fc14155cfae899da1d79302c723c1'], 'restrictions': {}, 'loose_restrictions': [], 'priority': {'f-e71c719fc7843a71d9b6ed77ba5c1139': 0, 'f-3e2e39666f6b20c1585ae513bd6c58a1': 1, 'f-92de0310d2e920306d74497bcdbd21f1': 2, 'f-f210a433c90cbabd4a839e5957c2ef7c': 3, 'f-5ecad7d214286d3d4cb2c0f51d71596c': 4, 'f-662771094d226039899ae3b5f21c6184': 5, 'f-6aa952faad415c3c2dab888f1e97135e': 6, 'f-c5a149b0b56aa271ac0e5456fcbd0c7c': 7, 'f-2d7b671f31682ff35e7b351af7c5ca2b': 8, 'f-0f7fc14155cfae899da1d79302c723c1': 9}, 'user_priority': 0, 'resources': None, 'submitting_task': None, 'retries': None, 'fifo_timeout': '100 ms', 'actors': False}]
distributed.comm.utils - ERROR - ('Could not serialize object of type tuple.', "(<function apply at 0x7f8c2cb36940>, <function test_serializable_locks.<locals>.f at 0x7f8c1da0e040>, (<class 'tuple'>, [0]), {'lock': <SerializableLock: 789a59f0-0049-40aa-a0ed-0f2fe6c213a8>})")
Traceback (most recent call last):
  File "/usr/lib/python3.8/site-packages/distributed/comm/utils.py", line 34, in _to_frames
    protocol.dumps(
  File "/usr/lib/python3.8/site-packages/distributed/protocol/core.py", line 36, in dumps
    data = {
  File "/usr/lib/python3.8/site-packages/distributed/protocol/core.py", line 37, in <dictcomp>
    key: serialize(
  File "/usr/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 244, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function apply at 0x7f8c2cb36940>, <function test_serializable_locks.<locals>.f at 0x7f8c1da0e040>, (<class 'tuple'>, [0]), {'lock': <SerializableLock: 789a59f0-0049-40aa-a0ed-0f2fe6c213a8>})")
distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
  File "/usr/lib/python3.8/site-packages/distributed/batched.py", line 92, in _background_send
    nbytes = yield self.comm.write(
  File "/usr/lib/python3.8/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/usr/lib/python3.8/site-packages/distributed/comm/tcp.py", line 220, in write
    frames = await to_frames(
  File "/usr/lib/python3.8/site-packages/distributed/comm/utils.py", line 54, in to_frames
    return _to_frames()
  File "/usr/lib/python3.8/site-packages/distributed/comm/utils.py", line 34, in _to_frames
    protocol.dumps(
  File "/usr/lib/python3.8/site-packages/distributed/protocol/core.py", line 36, in dumps
    data = {
  File "/usr/lib/python3.8/site-packages/distributed/protocol/core.py", line 37, in <dictcomp>
    key: serialize(
  File "/usr/lib/python3.8/site-packages/distributed/protocol/serialize.py", line 244, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type tuple.', "(<function apply at 0x7f8c2cb36940>, <function test_serializable_locks.<locals>.f at 0x7f8c1da0e040>, (<class 'tuple'>, [0]), {'lock': <SerializableLock: 789a59f0-0049-40aa-a0ed-0f2fe6c213a8>})")
distributed.scheduler - INFO - Remove client Client-01142bc3-d0d0-11ea-84a5-6cb311234570
distributed.scheduler - INFO - Remove client Client-01142bc3-d0d0-11ea-84a5-6cb311234570
distributed.scheduler - INFO - Close client connection: Client-01142bc3-d0d0-11ea-84a5-6cb311234570
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:45589
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:40153
distributed.scheduler - INFO - Remove worker <Worker 'tcp://127.0.0.1:45589', name: 0, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to tcp://127.0.0.1:45589
distributed.scheduler - INFO - Remove worker <Worker 'tcp://127.0.0.1:40153', name: 1, memory: 0, processing: 0>
distributed.core - INFO - Removing comms to tcp://127.0.0.1:40153
distributed.scheduler - INFO - Lost all workers
distributed.scheduler - INFO - Scheduler closing...
distributed.scheduler - INFO - Scheduler closing all comms
------------------------------ Captured log call -------------------------------
ERROR    asyncio:base_events.py:1707 Task exception was never retrieved
future: <Task finished name='Task-702' coro=<Client._gather.<locals>.wait() done, defined at /usr/lib/python3.8/site-packages/distributed/client.py:1816> exception=AllExit()>
Traceback (most recent call last):
  File "/usr/lib/python3.8/site-packages/distributed/client.py", line 1821, in wait
    raise AllExit()
distributed.client.AllExit
ERROR    asyncio:base_events.py:1707 Task exception was never retrieved
future: <Task finished name='Task-703' coro=<Client._gather.<locals>.wait() done, defined at /usr/lib/python3.8/site-packages/distributed/client.py:1816> exception=AllExit()>
Traceback (most recent call last):
  File "/usr/lib/python3.8/site-packages/distributed/client.py", line 1821, in wait
    raise AllExit()
distributed.client.AllExit
ERROR    asyncio:base_events.py:1707 Task exception was never retrieved
future: <Task finished name='Task-704' coro=<Client._gather.<locals>.wait() done, defined at /usr/lib/python3.8/site-packages/distributed/client.py:1816> exception=AllExit()>
Traceback (most recent call last):
  File "/usr/lib/python3.8/site-packages/distributed/client.py", line 1821, in wait
    raise AllExit()
distributed.client.AllExit
ERROR    asyncio:base_events.py:1707 Task exception was never retrieved
future: <Task finished name='Task-705' coro=<Client._gather.<locals>.wait() done, defined at /usr/lib/python3.8/site-packages/distributed/client.py:1816> exception=AllExit()>
Traceback (most recent call last):
  File "/usr/lib/python3.8/site-packages/distributed/client.py", line 1821, in wait
    raise AllExit()
distributed.client.AllExit
ERROR    asyncio:base_events.py:1707 Task exception was never retrieved
future: <Task finished name='Task-706' coro=<Client._gather.<locals>.wait() done, defined at /usr/lib/python3.8/site-packages/distributed/client.py:1816> exception=AllExit()>
Traceback (most recent call last):
  File "/usr/lib/python3.8/site-packages/distributed/client.py", line 1821, in wait
    raise AllExit()
distributed.client.AllExit
ERROR    asyncio:base_events.py:1707 Task exception was never retrieved
future: <Task finished name='Task-707' coro=<Client._gather.<locals>.wait() done, defined at /usr/lib/python3.8/site-packages/distributed/client.py:1816> exception=AllExit()>
Traceback (most recent call last):
  File "/usr/lib/python3.8/site-packages/distributed/client.py", line 1821, in wait
    raise AllExit()
distributed.client.AllExit
ERROR    asyncio:base_events.py:1707 Task exception was never retrieved
future: <Task finished name='Task-708' coro=<Client._gather.<locals>.wait() done, defined at /usr/lib/python3.8/site-packages/distributed/client.py:1816> exception=AllExit()>
Traceback (most recent call last):
  File "/usr/lib/python3.8/site-packages/distributed/client.py", line 1821, in wait
    raise AllExit()
distributed.client.AllExit
ERROR    asyncio:base_events.py:1707 Task exception was never retrieved
future: <Task finished name='Task-709' coro=<Client._gather.<locals>.wait() done, defined at /usr/lib/python3.8/site-packages/distributed/client.py:1816> exception=AllExit()>
Traceback (most recent call last):
  File "/usr/lib/python3.8/site-packages/distributed/client.py", line 1821, in wait
    raise AllExit()
distributed.client.AllExit
ERROR    asyncio:base_events.py:1707 Task exception was never retrieved
future: <Task finished name='Task-710' coro=<Client._gather.<locals>.wait() done, defined at /usr/lib/python3.8/site-packages/distributed/client.py:1816> exception=AllExit()>
Traceback (most recent call last):
  File "/usr/lib/python3.8/site-packages/distributed/client.py", line 1821, in wait
    raise AllExit()
distributed.client.AllExit
ERROR    asyncio:base_events.py:1707 Task exception was never retrieved
future: <Task finished name='Task-711' coro=<Client._gather.<locals>.wait() done, defined at /usr/lib/python3.8/site-packages/distributed/client.py:1816> exception=AllExit()>
Traceback (most recent call last):
  File "/usr/lib/python3.8/site-packages/distributed/client.py", line 1821, in wait
    raise AllExit()
distributed.client.AllExit

Environment:

Output of xr.show_versions()

INSTALLED VERSIONS

commit: None
python: 3.8.4 (default, Jul 15 2020, 10:38:22)
[GCC 10.1.0]
python-bits: 64
OS: Linux
OS-release: 5.7.8-arch1-1
machine: x86_64
processor:
byteorder: little
LC_ALL: None
LANG: en_US.UTF-8
LOCALE: en_US.UTF-8
libhdf5: 1.12.0
libnetcdf: 4.7.4

xarray: 0.16.0
pandas: 1.0.5
numpy: 1.19.1
scipy: 1.5.2
netCDF4: 1.5.4
pydap: None
h5netcdf: None
h5py: None
Nio: None
zarr: None
cftime: 1.2.1
nc_time_axis: None
PseudoNetCDF: None
rasterio: None
cfgrib: None
iris: None
bottleneck: 1.3.2
dask: 2.21.0
distributed: 2.21.0
matplotlib: 3.3.0
cartopy: None
seaborn: 0.10.1
numbagg: None
pint: 0.0.0
setuptools: 49.2.0
pip: None
conda: None
pytest: 5.4.3
IPython: None
sphinx: None

@keewis
Copy link
Collaborator

keewis commented Nov 25, 2020

We sometimes see this on the CI, too. It is a distributed issue and somewhat flaky (it happens due to network timeouts, I think?), usually rerunning the test will fix it. Not sure if we can do something to avoid this?

@ArchangeGabriel
Copy link
Contributor Author

The error seems variable indeed, this time I got error.log.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants