Skip to content

Conversation

@jakirkham
Copy link
Member

@jakirkham jakirkham commented May 22, 2020

Fixes #230

Try creating/freeing mutex in init/destroy to fixing an issue with leaking semaphores.

cc @alimanfoo @rc-conway @gigony @quasiben

TODO:

  • Unit tests and/or doctests in docstrings
  • tox -e py38 passes locally
  • Docstrings and API docs for any new/modified user-facing classes and functions
  • Changes documented in docs/release.rst
  • tox -e docs passes locally
  • AppVeyor and Travis CI passes
  • Test coverage to 100% (Coveralls passes)

@jakirkham
Copy link
Member Author

Friendly nudge @gigony @quasiben 🙂 Have either of you given this a try?

@gigony
Copy link

gigony commented May 29, 2020

Hi @jakirkham ,
I tried to execute the failing code with the updated numcodecs, but it still shows the error.

(dp) gbae@gbae:~/gitlab/clara/sdk/Applications/Operators/image_processing/app_dp_sample$ python fail.py
Done: (100, 100, 3)
/usr/lib/python3.6/multiprocessing/semaphore_tracker.py:145: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
  len(cache))
/usr/lib/python3.6/multiprocessing/semaphore_tracker.py:154: UserWarning: # name: /mp-kqu7ey81
  warnings.warn("# name: %s" % name)

(dp) gbae@gbae:~/gitlab/clara/sdk/Applications/Operators/image_processing/app_dp_sample$ pip freeze | grep numcodecs
-e git+git@github.com:zarr-developers/numcodecs.git@66cdfcf46fcac7b25fbcebabda87ab58d3352fcc#egg=numcodecs

I am using your patch to execute (66cdfcf46fcac7b25fbcebabda87ab58d3352fcc)

(dp) gbae@gbae:~/gitlab/clara/numcodecs-master$ git show
commit 66cdfcf46fcac7b25fbcebabda87ab58d3352fcc (HEAD -> master)
Author: Gigon Bae gbae@nvidia.com
Date: Thu May 28 17:24:37 2020 -0700

test
diff --git a/docs/release.rst b/docs/release.rst
index e7b123d..c801f0a 100644
--- a/docs/release.rst
+++ b/docs/release.rst
@@ -13,6 +13,9 @@ Upcoming Release
 * Drop support for Python 2.
   By :user:`James Bourbeau <jrbourbeau>`, :issue:`220`.
 
+* Fix leaked semaphore in ``numcodecs.blosc``.
+  By :user:`John Kirkham <jakirkham>`, :issue:`234`.
+
 
 .. _release_0.6.4:
 
diff --git a/numcodecs/blosc.pyx b/numcodecs/blosc.pyx
index ebd2271..1c7e212 100644
--- a/numcodecs/blosc.pyx
+++ b/numcodecs/blosc.pyx
@@ -74,10 +74,7 @@ AUTOSHUFFLE = -1
 AUTOBLOCKS = 0
 
 # synchronization
-try:
-    mutex = multiprocessing.Lock()
-except OSError:
-    mutex = None
+mutex = None
 
 # store ID of process that first loads the module, so we can detect a fork later
 _importer_pid = os.getpid()
@@ -85,12 +82,20 @@ _importer_pid = os.getpid()
 
 def init():
     """Initialize the Blosc library environment."""
+    global mutex
+    # synchronization
+    try:
+        mutex = multiprocessing.Lock()
+    except OSError:
+        mutex = None
     blosc_init()
 
 
 def destroy():
     """Destroy the Blosc library environment."""
+    global mutex
     blosc_destroy()
+    mutex = None
 
 
 def compname_to_compcode(cname):

@quasiben
Copy link

quasiben commented Jun 4, 2020

I no longer see the leaked semaphore warning. However, for my particular use case I am still not having success with generating 100TBs of data with Dask+Zarrr

@jakirkham
Copy link
Member Author

I'm wondering if Numcodecs was not getting cloned correctly in your case @gigony.

@quasiben, thanks for the update. Maybe we can look into the issue you ran into offline?

@gigony
Copy link

gigony commented Jun 4, 2020

@jakirkham let me double-check (by adding a probe) to make sure that numcodecs is cloned correctly today.

@jakirkham
Copy link
Member Author

Just to add Numcodecs has submodules. So would clone it and do git submodule update --init --recursive then use pip install . locally.

I'm not sure if pip would be initializing these submodules or GitHub supplying the submodules via the URL. Though it's not something I've tested extensively.

@gigony
Copy link

gigony commented Jun 4, 2020

ah, for submodule thing, I already executed the commands when I clone/apply your patch --without that, numcodecs won’t blosc module.

Let me create a docker image that can reproduce the error, if possible.

@jakirkham
Copy link
Member Author

Ah ok. Sorry if I'm misreading things then.

@gigony
Copy link

gigony commented Jun 5, 2020

hi @jakirkham ,
I found that the patch wasn't applied in my previous setup ( pip install . created a blosc library even without Cython due to this logic: https://github.com/zarr-developers/numcodecs/blob/master/setup.py#L13 ).

With Cython installed, I actually be able to test it with your patch.
However, it still shows the warning message.

You can reproduce with https://github.com/gigony/test-numcodecs

git clone https://github.com/gigony/test-numcodecs.git
cd test-numcodecs
./run_numcodecs_leak.sh patch
Successfully installed numcodecs-0.6.5.dev9+dirty
numcodecs @ file:///app/numcodecs
### Mutex created in init()###
### Mutex created in init()###
Done: (100, 100, 3)
/usr/lib/python3.6/multiprocessing/semaphore_tracker.py:143: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
  len(cache))
### Mutex destroyed in destroy() ###

One weird thing is that init() was called twice while destroy() was called only once.
Please check it.

@jakirkham
Copy link
Member Author

Ah interesting. Thanks for the detailed info, Gigon. Will take a closer 🙂

@rc-conway
Copy link

Yeah, probably should guard it with

def init():
     """Initialize the Blosc library environment."""
     global MUTEX
     # synchronization
     if MUTEX is None:
         try:
             MUTEX = multiprocessing.Lock()
         except OSError:
             MUTEX = None
     blosc_init()

@jakirkham
Copy link
Member Author

Well that doesn't quite work as it doesn't distinguish between whether the lock could be created or not. So will end up calling blosc_init twice, which it shouldn't do. We could imagine doing something similar as a workaround. However the fact that this function is being called twice is a bit odd. We should figure out why that is (as that shouldn't be happening).

@gigony
Copy link

gigony commented Jun 5, 2020

The followings are call stacks when init() was called -- first init was called by main program (fail.py, from numcodecs import Blosc) and second one was called when multi-processes were spawned(/usr/local/lib/python3.6/dist-packages/zarr/__init__.py, from zarr.codecs import *)

To see the call stack, the following line was added to the patch.

+    print("### Mutex created in init()###");import os,threading; print("# process ID:{}, thread ID:{}".format(os.getpid(), threading.get_ident()));import inspect; print(repr(inspect.stack())); # synchronization
### Mutex created in init()###
# process ID:451, thread ID:140389632247616
[
FrameInfo(frame=<frame object at 0x1d84d78>, filename='/usr/local/lib/python3.6/dist-packages/numcodecs/__init__.py', lineno=51, function='<module>', code_context=['    blosc.init()\n'], index=0), 
FrameInfo(frame=<frame object at 0x1cb31e8>, filename='<frozen importlib._bootstrap>', lineno=219, function='_call_with_frames_removed', code_context=None, index=None), 
FrameInfo(frame=<frame object at 0x7f47a2502788>, filename='<frozen importlib._bootstrap_external>', lineno=678, function='exec_module', code_context=None, index=None), 
FrameInfo(frame=<frame object at 0x7f47a5360c48>, filename='<frozen importlib._bootstrap>', lineno=665, function='_load_unlocked', code_context=None, index=None), 
FrameInfo(frame=<frame object at 0x1d66268>, filename='<frozen importlib._bootstrap>', lineno=955, function='_find_and_load_unlocked', code_context=None, index=None), 
FrameInfo(frame=<frame object at 0x1cdf958>, filename='<frozen importlib._bootstrap>', lineno=971, function='_find_and_load', code_context=None, index=None), 
FrameInfo(frame=<frame object at 0x17a2078>, filename='/app/fail.py', lineno=46, function='<module>', code_context=['        from numcodecs import Blosc\n'], index=0)]


### Mutex created in init()###
# process ID:502, thread ID:140094067480384
[
FrameInfo(frame=<frame object at 0x1fc82b8>, filename='/usr/local/lib/python3.6/dist-packages/numcodecs/__init__.py', lineno=51, function='<module>', code_context=['    blosc.init()\n'], index=0), 
FrameInfo(frame=<frame object at 0x7fe4f9baccc0>, filename='<frozen importlib._bootstrap>', lineno=219, function='_call_with_frames_removed', code_context=None, index=None), 
FrameInfo(frame=<frame object at 0x7fe4f932f048>, filename='<frozen importlib._bootstrap_external>', lineno=678, function='exec_module', code_context=None, index=None), 
FrameInfo(frame=<frame object at 0x7fe4f9329a48>, filename='<frozen importlib._bootstrap>', lineno=665, function='_load_unlocked', code_context=None, index=None), 
FrameInfo(frame=<frame object at 0x1fc8078>, filename='<frozen importlib._bootstrap>', lineno=955, function='_find_and_load_unlocked', code_context=None, index=None), 
FrameInfo(frame=<frame object at 0x1fc7e28>, filename='<frozen importlib._bootstrap>', lineno=971, function='_find_and_load', code_context=None, index=None), 
FrameInfo(frame=<frame object at 0x7fe4f932a398>, filename='/usr/local/lib/python3.6/dist-packages/zarr/codecs.py', lineno=3, function='<module>', code_context=['from numcodecs import *\n'], index=0), 
FrameInfo(frame=<frame object at 0x7fe4f9bacaf8>, filename='<frozen importlib._bootstrap>', lineno=219, function='_call_with_frames_removed', code_context=None, index=None), 
FrameInfo(frame=<frame object at 0x7fe4f9ba7cf8>, filename='<frozen importlib._bootstrap_external>', lineno=678, function='exec_module', code_context=None, index=None), 
FrameInfo(frame=<frame object at 0x7fe4f9329848>, filename='<frozen importlib._bootstrap>', lineno=665, function='_load_unlocked', code_context=None, index=None), 
FrameInfo(frame=<frame object at 0x1fc7be8>, filename='<frozen importlib._bootstrap>', lineno=955, function='_find_and_load_unlocked', code_context=None, index=None), 
FrameInfo(frame=<frame object at 0x1fc79c8>, filename='<frozen importlib._bootstrap>', lineno=971, function='_find_and_load', code_context=None, index=None), 
FrameInfo(frame=<frame object at 0x7fe4f932a1f0>, filename='/usr/local/lib/python3.6/dist-packages/zarr/__init__.py', lineno=3, function='<module>', code_context=['from zarr.codecs import *\n'], index=0), 
FrameInfo(frame=<frame object at 0x7fe4f9bac930>, filename='<frozen importlib._bootstrap>', lineno=219, function='_call_with_frames_removed', code_context=None, index=None), 
FrameInfo(frame=<frame object at 0x7fe4fb496048>, filename='<frozen importlib._bootstrap_external>', lineno=678, function='exec_module', code_context=None, index=None), 
FrameInfo(frame=<frame object at 0x7fe4fb48cc48>, filename='<frozen importlib._bootstrap>', lineno=665, function='_load_unlocked', code_context=None, index=None), 
FrameInfo(frame=<frame object at 0x1fc7788>, filename='<frozen importlib._bootstrap>', lineno=955, function='_find_and_load_unlocked', code_context=None, index=None), 
FrameInfo(frame=<frame object at 0x1fc7568>, filename='<frozen importlib._bootstrap>', lineno=971, function='_find_and_load', code_context=None, index=None), 
FrameInfo(frame=<frame object at 0x7fe4ec001558>, filename='<frozen importlib._bootstrap>', lineno=219, function='_call_with_frames_removed', code_context=None, index=None), 
FrameInfo(frame=<frame object at 0x1fbc4c8>, filename='<frozen importlib._bootstrap>', lineno=941, function='_find_and_load_unlocked', code_context=None, index=None), 
FrameInfo(frame=<frame object at 0x1fbc2a8>, filename='<frozen importlib._bootstrap>', lineno=971, function='_find_and_load', code_context=None, index=None), 
FrameInfo(frame=<frame object at 0x7fe4f9329448>, filename='/usr/local/lib/python3.6/dist-packages/distributed/protocol/pickle.py', lineno=66, function='loads', code_context=['            return pickle.loads(x)\n'], index=0), 
FrameInfo(frame=<frame object at 0x7fe4f9baa220>, filename='/usr/local/lib/python3.6/dist-packages/distributed/protocol/serialize.py', lineno=64, function='pickle_loads', code_context=['    return pickle.loads(x, buffers=buffers)\n'], index=0), 
FrameInfo(frame=<frame object at 0x1fc5e38>, filename='/usr/local/lib/python3.6/dist-packages/distributed/protocol/serialize.py', lineno=302, function='deserialize', code_context=['    return loads(header, frames)\n'], index=0), 
FrameInfo(frame=<frame object at 0x1fc54b8>, filename='/usr/local/lib/python3.6/dist-packages/distributed/protocol/core.py', lineno=130, function='loads', code_context=['                value = _deserialize(head, fs, deserializers=deserializers)\n'], index=0), 
FrameInfo(frame=<frame object at 0x7fe4ec004198>, filename='/usr/local/lib/python3.6/dist-packages/distributed/comm/utils.py', lineno=66, function='_from_frames', code_context=['                frames, deserialize=deserialize, deserializers=deserializers\n'], index=0), 
FrameInfo(frame=<frame object at 0x7fe4ec003f48>, filename='/usr/local/lib/python3.6/dist-packages/distributed/comm/utils.py', lineno=87, function='from_frames', code_context=['        res = _from_frames()\n'], index=0), 
FrameInfo(frame=<frame object at 0x7fe4ec019938>, filename='/usr/local/lib/python3.6/dist-packages/distributed/comm/tcp.py', lineno=206, function='read', code_context=['                    allow_offload=self.allow_offload,\n'], index=0), 
FrameInfo(frame=<frame object at 0x1fc6e48>, filename='/usr/local/lib/python3.6/dist-packages/distributed/core.py', lineno=503, function='handle_stream', code_context=['                msgs = await comm.read()\n'], index=0), 
FrameInfo(frame=<frame object at 0x1fc6bc8>, filename='/usr/local/lib/python3.6/dist-packages/distributed/worker.py', lineno=915, function='handle_scheduler', code_context=['                comm, every_cycle=[self.ensure_communicating, self.ensure_computing]\n'], index=0), 
FrameInfo(frame=<frame object at 0x7fe4ec018b58>, filename='/usr/lib/python3.6/asyncio/events.py', lineno=145, function='_run', code_context=['            self._callback(*self._args)\n'], index=0), 
FrameInfo(frame=<frame object at 0x1fb9038>, filename='/usr/lib/python3.6/asyncio/base_events.py', lineno=1451, function='_run_once', code_context=['                handle._run()\n'], index=0), 
FrameInfo(frame=<frame object at 0x7fe4fa418448>, filename='/usr/lib/python3.6/asyncio/base_events.py', lineno=438, function='run_forever', code_context=['                self._run_once()\n'], index=0), 
FrameInfo(frame=<frame object at 0x7fe4ec002338>, filename='/usr/local/lib/python3.6/dist-packages/tornado/platform/asyncio.py', lineno=149, function='start', code_context=['            self.asyncio_loop.run_forever()\n'], index=0), 
FrameInfo(frame=<frame object at 0x7fe4fa418248>, filename='/usr/local/lib/python3.6/dist-packages/tornado/ioloop.py', lineno=526, function='run_sync', code_context=['        self.start()\n'], index=0), 
FrameInfo(frame=<frame object at 0x1f93678>, filename='/usr/local/lib/python3.6/dist-packages/distributed/nanny.py', lineno=781, function='_run', code_context=['            loop.run_sync(run)\n'], index=0), 
FrameInfo(frame=<frame object at 0x7fe4fb501240>, filename='/usr/local/lib/python3.6/dist-packages/distributed/process.py', lineno=191, function='_run', code_context=['        target(*args, **kwargs)\n'], index=0), 
FrameInfo(frame=<frame object at 0x7fe4fb907a98>, filename='/usr/lib/python3.6/multiprocessing/process.py', lineno=93, function='run', code_context=['            self._target(*self._args, **self._kwargs)\n'], index=0), 
FrameInfo(frame=<frame object at 0x1f92b98>, filename='/usr/lib/python3.6/multiprocessing/process.py', lineno=258, function='_bootstrap', code_context=['                self.run()\n'], index=0), 
FrameInfo(frame=<frame object at 0x1747448>, filename='/usr/lib/python3.6/multiprocessing/spawn.py', lineno=118, function='_main', code_context=['    return self._bootstrap()\n'], index=0), 
FrameInfo(frame=<frame object at 0x16f6128>, filename='/usr/lib/python3.6/multiprocessing/spawn.py', lineno=105, function='spawn_main', code_context=['    exitcode = _main(fd)\n'], index=0), 
FrameInfo(frame=<frame object at 0x170d3a8>, filename='<string>', lineno=1, function='<module>', code_context=None, index=None)]

Are those lines of code caused first init() method call which looks normal use cases to me.

https://github.com/gigony/test-numcodecs/blob/86358cc53264fae63a1421e7f423df131057c933/fail.py#L46

        from numcodecs import Blosc
        compressor = Blosc()
        x = da.random.random((100, 100, 3), chunks=(100, 100,3))
        x.to_zarr('fail.zarr', compressor=compressor, overwrite=True)

@quasiben
Copy link

quasiben commented Jun 6, 2020

After further testing, the issue was resolved for my use case. I think my issue before was filling up temporary disk space

@rc-conway
Copy link

did this PR end up working correctly?

@jakirkham
Copy link
Member Author

Did you try it, @rc-conway? 🙂

@rc-conway
Copy link

rc-conway commented Aug 18, 2020

@jakirkham I finally got the time to test your code.

I get the semaphore warning no matter what. I built a reductive case after trying to update your branch for a fix:

# dummyblosc.pyx
import multiprocessing

MUTEX = None


def init():
     global MUTEX
     # synchronization
     try:
         MUTEX = multiprocessing.Lock()
     except OSError:
         MUTEX = None


def destroy():
     global MUTEX
     MUTEX = None
# basic.py
import atexit
import multiprocessing as mp
import time

import dummyblosc

dummyblosc.init()
atexit.register(dummyblosc.destroy)

def goofy(x):
    print(x)
    time.sleep(x)

def doit():
    with mp.Pool(2) as pool:
        pool.map(goofy, range(3))
    pool.join()

if __name__ == '__main__':
    mp.freeze_support()
    doit()
    print('Hooray!')

Yields

/usr/local/miniconda3/envs/my_env/lib/python3.8/multiprocessing/resource_tracker.py:203: UserWarning: resource_tracker: There appear to be 2 leaked semaphore objects to clean up at shutdown

The ONLY simple solution I have found to deal with this is to NOT call

dummyblosc.init()
atexit.register(dummyblosc.destroy)

At the global level in __init__.py and to initialize it separately in a function call.

There are a couple of other to go about a fix for this with regards to creating the Lock, but the solution you provided does not work for me.

My current workaround is to only import zarr within function calls used in the child process

FYI, this is a MacOS issue, I do not see this on Linux

@jstriebel
Copy link
Member

FYI, this is a MacOS issue, I do not see this on Linux

We're also experiencing this on Linux, probably due to using spawn instead of fork (fork is the default on Unix). So this should be reproducible on Linux as well, using mp.set_start_method('spawn').

@joshmoore
Copy link
Member

And this PR fixes the issue, @jstriebel?

@jakirkham
Copy link
Member Author

Probably not unfortunately (given the other tests above). Maybe this code needs to be updated to not touch multiprocessing functions/objects

try:
from numcodecs import blosc
from numcodecs.blosc import Blosc
register_codec(Blosc)
# initialize blosc
try:
ncores = multiprocessing.cpu_count()
except OSError: # pragma: no cover
ncores = 1
blosc.init()
blosc.set_nthreads(min(8, ncores))
atexit.register(blosc.destroy)
except ImportError: # pragma: no cover
pass

That said, it is good to know it is a spawn issue. macOS uses spawn by default on Python 3.8+ so that explains why it was seen there primarily

@jstriebel
Copy link
Member

And this PR fixes the issue, @jstriebel?

Testing this didn't work easily, due to gcc errors when installing from git directly. Do you have wheel-releases for branches by any chance?

@jakirkham jakirkham closed this by deleting the head repository Oct 18, 2022
@jakirkham jakirkham reopened this Oct 18, 2022
@jakirkham jakirkham closed this Oct 18, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

numcodecs.blosc mutex leaked semaphore warning

6 participants