Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiprocess access to Zarr groups hangs indefinetly #199

Closed
CSNoyes opened this issue Nov 24, 2017 · 3 comments
Closed

Multiprocess access to Zarr groups hangs indefinetly #199

CSNoyes opened this issue Nov 24, 2017 · 3 comments
Labels
documentation Improvements to the documentation release notes done Automatically applied to PRs which have release notes.
Milestone

Comments

@CSNoyes
Copy link
Contributor

CSNoyes commented Nov 24, 2017

Notes: tested pickling processed_zarr and passing into each process, reopening from scratch in each process, with and without process synchronizer. Running with a concurrent.futures.ThreadPoolExecutor works but I'm doing some blocking compute on each slice so this would be really useful.

MVP below can be run from scratch.

import zarr
import numpy as np
from pprint import pprint
import multiprocessing
from zarr import blosc
blosc.set_nthreads(20)
blosc.use_threads=True


synchronizer = zarr.ProcessSynchronizer('example.sync')
processed_zarr = zarr.hierarchy.open_group("test.zarr", 'a', synchronizer=synchronizer)

features_arr = np.random.random_sample((10000,20))
processed_zarr.create_dataset("features_arr", data=features_arr, shape=features_arr.shape, dtype="float64", overwrite=True)

ixs = np.arange(processed_zarr["features_arr"].shape[0])
slices = np.linspace(0, processed_zarr["features_arr"].shape[0]-1, 100, dtype=np.int32)

sliceIter = []
for i in range(len(slices)-1):
    sliceIter.append({
        "min" : ixs[slices[i]],
        "max" : ixs[slices[i+1]],
        "slice_num" : i,
    })
pprint(sliceIter)

### slices breakds up the np.arange of processed_zarr["features_arr"] into n number of slices in an np.linspace
#
def mem_instantiate(param_dict):
    min_ix = param_dict["min"]
    max_ix = param_dict["max"]
    slice_num = param_dict["slice_num"]

    ### never gets past loading the features
    instantiated_features = processed_zarr["features_arr"][min_ix:max_ix]
    print(slice_num, "features loaded")


pool = multiprocessing.Pool(processes=5)
pool.map(mem_instantiate, sliceIter)
pool.close()
pool.join()
@alimanfoo
Copy link
Member

I think this is down to an interaction with blosc when blosc tries to use multiple threads internally. If I set blosc.use_threads = False or I try any other compressor this works fine.

I don't fully understand what's happening here, but IIRC blosc's multi-threaded "global" versions of compress/decompress maintain some global state, and so are not designed for use in a multi-threaded or multi-process context. When blosc.use_threads = False then blosc's "contextual" versions of compress/decompress will be used, each of which maintains local state and so is safe to use in a multi-threaded or multi-process context.

Btw if you are only reading (not writing) from multiple processes there is no need for a synchronizer. The reason this hangs is nothing to do with the synchronizer, it is getting locked up inside blosc.

@CSNoyes
Copy link
Contributor Author

CSNoyes commented Nov 24, 2017

Got it, tested without use_threads and that works. Thanks very much for the help. Mind if I make a pull to the docs to add a note about implementing manual multiprocess access? I mostly use Dask but this was pretty unintuitive -- I figured it had something to do with some shared object causing infinite context switching (based on cProfile this might be caused by blosc).

@alimanfoo alimanfoo added this to the v2.2 milestone Nov 24, 2017
@alimanfoo alimanfoo added the documentation Improvements to the documentation label Nov 24, 2017
@alimanfoo
Copy link
Member

Resolved via #201.

@alimanfoo alimanfoo added the release notes done Automatically applied to PRs which have release notes. label Nov 24, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements to the documentation release notes done Automatically applied to PRs which have release notes.
Projects
None yet
Development

No branches or pull requests

2 participants