Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Shared arrays #43

Closed
wants to merge 26 commits into from
Closed

WIP: Shared arrays #43

wants to merge 26 commits into from

Conversation

ogrisel
Copy link
Contributor

@ogrisel ogrisel commented Jul 23, 2012

Early pull request to introduce a new datastructure that blends the good features of np.memmap and multiprocessing.Array for working with joblib.Parallel without exhausting the memory when dealing with large data arrays.

This can be considered an alternative or complementary solution to PR #40 for issue #38.

TODO:

  • more tests (let's reach 99% coverage)
  • write docstrings
  • what should be SharedArray(10) + 3? A regular numpy array? If so implement it and test it.
  • implement a as_shared_datastructure to reallocate scipy.sparse and other nested datastructures with arrays to make it easier to work in a multiprocessing context
  • integration with joblib.load and joblib.Memory.cache (maybe with a shared=True option)?
  • add a share_memory option to joblib.Parallel to call as_shared_datastructure on the args?
  • some narrative documentation (once everything is)

@GaelVaroquaux
Copy link
Member

Nitpicks (I am starting with the nitpicks, because they don't require an understanding of the code):

  • I'd prefer the file to be named share_array.py
  • assharedarray -> as_shared_array. That way I don't read 'ass hared array'
  • In the tests, I'd like to see a test using Parallel that checks that we are indeed having a view.

Now to the big picture:

  • Do we want to have the same object to do anonymous and file-based memmap? I think that focusing on anynomous memmaps would simplify the code
  • Is there a reason to duplicate file-based memmaps functionality from numpy?
  • Right now, I believe that any array created from a SharedArray will be a SharedArray:
In [1]: from joblib import sharedarray

In [2]: a = np.zeros((10, 10))

In [3]: b = sharedarray.assharedarray(a)

In [4]: b
Out[4]: 
SharedArray([[ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.],
       [ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.],
       [ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.],
       [ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.],
       [ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.],
       [ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.],
       [ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.],
       [ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.],
       [ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.],
       [ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.]])

In [5]: b[:2]
Out[5]: 
SharedArray([[ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.],
       [ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.]])

In [6]: b[:2] + 3
Out[6]: 
SharedArray([[ 3.,  3.,  3.,  3.,  3.,  3.,  3.,  3.,  3.,  3.],
       [ 3.,  3.,  3.,  3.,  3.,  3.,  3.,  3.,  3.,  3.]])

However, the way that you have coded it, their is memmapping going on for Out[5] but not Ou[6](because you are checking for). Thus the SharedArray is Out[6] is not actually usable in a multiprocessing context.

I do believe that it is a desired feature. If you code it the other way around, as people do their computations, they will end up with heaps of shared arrays. Each one of these have an associated file descriptor, and at the end of the day, you run out of file descriptors (the infamous 'too many files open' error). This happens to me when I work with memmapped arrays.

So, we want this feature that daughter arrays do not rely on memmapping, but right now it is confusing to the user, as the user has the impression that he has arrays that can be shared across processes. I am not sure how address this problem, but I think that using the priority mechanism and the inheritance model of numpy, we can improve things. In the least, we can probably avoid that grand-daughter arrays are ShareArrays.

In this light, I suggest putting aray_priority to -9999: this means that when operating with other array subclasses, this subclass will always loose in the subclass-coercion mechanism (see http://docs.scipy.org/doc/numpy/reference/arrays.classes.html).

Also, I wonder if array_prepare and array_wrap could not be made in a clever way, so that in the cases for which _mmap is None, standard ndarrays are created instead of SharedArrays.

@ogrisel
Copy link
Contributor Author

ogrisel commented Jul 24, 2012

I'd prefer the file to be named share_array.py

I suppose you meant shared_array.py

assharedarray -> as_shared_array. That way I don't read 'ass hared array'

Alright, I just wanted to be consistent with np.asanyarray

In the tests, I'd like to see a test using Parallel that checks that we are indeed having a view.

Yes this is planned. The lock feature is missing too.

Now to the big picture:

Do we want to have the same object to do anonymous and file-based memmap?
I think that focusing on anynomous memmaps would simplify the code

Indeed but they would share a lot of common code with the file-based variant.

Is there a reason to duplicate file-based memmaps functionality from numpy?

Yes precisely to solve the multiprocessing memory copy of memmaps (and add the lock feature to them). This was my initial use case (the anonymous mode is an almost free bonus of this refactoring).

I will try to do experiments with array priorities and do some open file descriptors profiling tonight. Thanks for this first review.

@ogrisel
Copy link
Contributor Author

ogrisel commented Jul 24, 2012

Good news: for anonymous shared arrays, there is no attached filedescriptor:

In [1]: from joblib.sharedarray import assharedarray

In [2]: import numpy as np

In [3]: a = np.zeros(10)

In [6]: %time l = [assharedarray(a) for _ in range(10000)]
CPU times: user 1.35 s, sys: 0.10 s, total: 1.45 s
Wall time: 1.46 s

In [7]: l[0] is l[1]
Out[7]: False

In [8]: l[0]
Out[8]: SharedArray([ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.])

Before the running this:

~> sudo lsof  | wc -l
    3445

After running this:

~> sudo lsof  | wc -l
    3449

@ogrisel
Copy link
Contributor Author

ogrisel commented Jul 25, 2012

I decided to split the file based and anonymous memory case in distinct classes as you requested. I have also tried to summarize what remains to be done in the description of this PR based on your feedback + other considerations.

@GaelVaroquaux
Copy link
Member

I decided to split the file based and anonymous memory case in distinct classes as you requested.

That was a suggestion, and not a request. I can be convinced otherwise.

I have also tried to summarize what remains to be done in the description of this PR based on your feedback + other considerations.

Cool. I am not sure what the shared=True option in cache/load would be
for.

@ogrisel
Copy link
Contributor Author

ogrisel commented Jul 25, 2012

That was a suggestion, and not a request. I can be convinced otherwise.

The code is now much simpler so I think it's better this way :)

I am not sure what the shared=True option in cache/load would be
for.

The goal would be to avoid doing useless memory allocation in a non shareable array of cached serialized results prior to feeding a call to parallel.

@ogrisel
Copy link
Contributor Author

ogrisel commented Aug 1, 2012

Just a quick progress note on this. The coverage of the current implementation is good but there are still 2 outstanding issues:

  • I need to find out how to override __array_priority__ and maybe __array_wrap__ to make the SharedArray class behave correctly with operators (e.g. SharedArray(10) + 3 should return a regular numpy array to make it explicit that memory copy happens instead of returning a fake SharedArray instance as is currently the case).
  • the current design can segfault if the original SharedArray is garbage collected while there is still pickled versions of it waiting in a queue of a multiprocessing.Pool for instance. This issue is probably best handled by spawning a custom multiprocessing.Manager that handles the allocation of the original shared memory and the reference counts from any SharedArray instance or pickle that needs it. More prototyping work is required to devise what's best and robust.

@GaelVaroquaux
Copy link
Member

OK, I think that I am going to do a new minor release of joblib not
waiting for this PR to be merged. That way we can get out a bugfix only
release of joblib.

@ogrisel
Copy link
Contributor Author

ogrisel commented Aug 1, 2012

No pbm. Don't wait for me, this is still a WIP.

@glouppe
Copy link

glouppe commented Aug 21, 2012

@ogrisel Have you made progress on this? I am not so familiar with joblib codebase, but if I can be of any help, please tell me! (I can dive into it) Shared arrays are definitely something I'd like to see properly implemented.

@travisbot
Copy link

This pull request fails (merged 54749eb into ad5fd41).

@ogrisel
Copy link
Contributor Author

ogrisel commented Aug 21, 2012

@glouppe yes I decided to stop using anonymous mmap as it would make it much to complex to implement proper multiprocess garbage collection and use tempfile instead. That might incur some overhead though. I still have to implement early garbage collection + gc tests with pre allocated multiprocessing pools + operators priority.

@travisbot
Copy link

This pull request fails (merged cf24d21 into ad5fd41).

@travisbot
Copy link

This pull request fails (merged 8595102c into ad5fd41).

@travisbot
Copy link

This pull request fails (merged d24e5d1 into 8aa6e48).

@glouppe
Copy link

glouppe commented Aug 27, 2012

@ogrisel How can I help you with any of these things? Feel free to delegate some work. I'd be glad to help.

@ogrisel
Copy link
Contributor Author

ogrisel commented Aug 27, 2012

I have been given it a thought this WE and I cannot come up with a good solution anymore. The current code has two issues:

  • it leaks temporary files until the process exits even if all the pickled and live instances are collected. One could add a method to explicitly let the user collect the tempfile when he / she knows that know more running or queued processes will need it in the future. I had started to implement some reference counting in shared memory for multiple process but this is too complicated to implement, or even impossible if the multiprocessing Pool is forked before the allocation of the shared array submitted to the pool queue.
  • overriding the __reduce__ method will make the joblib memoizer (the Memory.cache method) fail to detect changes in the actual data. It would need to be aware that computing the digest should be done on the data buffer instead of the results of a pickle (I need to check how it's implemented to know whether this is already the case or not). Anyway that might break in other usage of the pickler.

I am thinking that the best way to go would be to not use a custom __reduce__ method on the SharedArray and memmap classes but instead to find a way to make our (joblib's) multiprocessing Pool instances use a Queue implementation that has a customize pickler.

@GaelVaroquaux
Copy link
Member

@ogrisel: I cannot allocate time on this before the sprint, but I'd love
to hash on these problems at the sprint.

@ogrisel
Copy link
Contributor Author

ogrisel commented Aug 30, 2012

I have started to work on multiprocessing.Pool + multiprocessing.queues.SimpleQueue subclasses that make it possible to register custom reducers and thus to handle mmap'ed arrays without subclassing them. That will be much cleaner IMHO. I hope I will have time to work on this this WE and start experimenting on a branch for sklearn integration to address the RandomForest use case.

@ogrisel
Copy link
Contributor Author

ogrisel commented Sep 9, 2012

I am closing this PR as the approach in #44 looks much better.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants