New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP : Multiprocessing - implemented a parallel_voxel_fit decorator #1135

Closed
wants to merge 3 commits into
base: master
from

Conversation

Projects
None yet
6 participants
@sahmed95
Copy link
Contributor

sahmed95 commented Oct 22, 2016

This is the original work of @MrBago from his branch. I have refactored the code a little and added a new decorator parallel_voxel_fit similar to the multi_voxel_fit. Both can be merged into one later.

The current implementation adopts his code in dipy/multi. The decorator parallel_voxel_fit can be used directly in modules such as dipy/reconst/dsi.py in the following manner (this is also a test case):

class TestModel(ReconstModel):
    """
    Reconst model to test `parallel_voxel_fit` decorator.
    """
    @parallel_voxel_fit
    def fit(self, single_voxel_data):
        return ReconstFit(self, single_voxel_data.sum())

To do:

  • Discuss a better way to activate or deactivate mutiprocessing rather than setting a global _dipy_num_cpu variable. (Is it okay ?)
  • Improve documenation, refactor and optimize the current code.
  • Profile the performance on multicore CPU's
  • Better handling of masks and splitting of voxels for fitting.
  • Simplify current multi voxel models into single voxel fitting by using the decorators parallel_voxel_fit or multi_voxel_fit. (For examole in reconst/dti.py, reconst/dki.py multi voxel fitting is carried out by iterating over voxels whereas in reconst/dsi.py the mult_voxel_fit decorator is used and the fit method is written for a single voxel.)
@codecov-io

This comment has been minimized.

Copy link

codecov-io commented Oct 23, 2016

Codecov Report

Merging #1135 into master will increase coverage by 4.95%.
The diff coverage is n/a.

@@            Coverage Diff             @@
##           master    #1135      +/-   ##
==========================================
+ Coverage   80.86%   85.82%   +4.95%     
==========================================
  Files         217      217              
  Lines       24593    26045    +1452     
  Branches     2491     2671     +180     
==========================================
+ Hits        19888    22352    +2464     
+ Misses       4194     3038    -1156     
- Partials      511      655     +144
Impacted Files Coverage Δ
dipy/viz/init.py 45.45% <ø> (-54.55%)
dipy/utils/arrfuncs.py 56.41% <ø> (-38.47%)
dipy/fixes/scipy.py 31.25% <ø> (-31.25%)
dipy/core/optimize.py 65.92% <ø> (-26.67%)
dipy/core/tests/test_optimize.py 77.27% <ø> (-20.46%)
dipy/reconst/ivim.py 79.5% <ø> (-9.94%)
dipy/utils/optpkg.py 64.28% <ø> (-3.58%)
dipy/reconst/shm.py 86.84% <ø> (-2.97%)
dipy/core/gradients.py 97% <ø> (-2%)
dipy/data/init.py 89.33% <ø> (-1.34%)
... and 51 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update da31980...503c1e1. Read the comment docs.

@coveralls

This comment has been minimized.

Copy link

coveralls commented Oct 23, 2016

Coverage Status

Coverage remained the same at 82.938% when pulling f707aa5 on sahmed95:multiprocessing into da31980 on nipy:master.

1 similar comment
@coveralls

This comment has been minimized.

Copy link

coveralls commented Oct 23, 2016

Coverage Status

Coverage remained the same at 82.938% when pulling f707aa5 on sahmed95:multiprocessing into da31980 on nipy:master.

@MrBago

This comment has been minimized.

Copy link
Contributor

MrBago commented Oct 24, 2016

@sahmed95 thanks for pushing this along! One thing I ran into when writing this was that it might be a good idea to launch (either on startup or the first time a parallel function is called) and re-use the Pool on different parallel calls. If you do this, I believe you need to register a "pool exit" function with python or come up with some other way of quitting the pool.

The other thing that's closely related to this pool request is the implementation of a "peaks from fit" type function. This function should be like peaks_from_model, but take a Fit or MultiVoxelFit object as input.

I also have a few other ideas about where this kind of framework could go :), but I'll save those for later. Let me know if I can answer any questions or help in some other way.

@Garyfallidis

This comment has been minimized.

Copy link
Member

Garyfallidis commented Nov 1, 2016

All good points. Currently the peaks_from_model uses a lot of memory when parallelized. We need to make sure that this does not happen in the new version where the parallelization has moved to the fitting part.

@sahmed95

This comment has been minimized.

Copy link
Contributor

sahmed95 commented Dec 25, 2016

Should implement pool manager from the latest commit in the original code.

@Garyfallidis

This comment has been minimized.

Copy link
Member

Garyfallidis commented Feb 17, 2017

@sahmed95 what is the state of this PR? Can you bring us up to speed? It would be nice if we could make this work.

Added MrBago's commits for pool launch at startup
config.py file added for multiprocessing
@sahmed95

This comment has been minimized.

Copy link
Contributor

sahmed95 commented Feb 17, 2017

Hi, I have updated the code implementing pool manager at launch by @MrBago. To use multiprocessing, we just need to replace the decorator multi_voxel_fit with parallel_voxel_fit at places like this. Regarding the peaks_from_model type function, my understanding is that we need a way to specify if we want the fit to be implemented using multiprocessing or not. This is currently done by replacing the multi_voxel_fit decorators to parallel_voxel_fit. Instead of that, we wish to have something like reconst_model.fit(data, parallel=True) which activates the multiprocessing. Is that correct ?

Currently the peaks_from_model uses a lot of memory when parallelized.

Do we have an idea why that happens ?

@sahmed95 sahmed95 closed this Feb 22, 2017

@sahmed95 sahmed95 reopened this Feb 22, 2017

@coveralls

This comment has been minimized.

Copy link

coveralls commented Feb 22, 2017

Coverage Status

Coverage increased (+5.4%) to 88.318% when pulling 503c1e1 on sahmed95:multiprocessing into da31980 on nipy:master.

@coveralls

This comment has been minimized.

Copy link

coveralls commented Feb 22, 2017

Coverage Status

Coverage increased (+5.4%) to 88.318% when pulling 503c1e1 on sahmed95:multiprocessing into da31980 on nipy:master.

@skoudoro
Copy link
Member

skoudoro left a comment

First review, I do not know how can I help for this PR. Can I commit some modification or should I fork this branch. I like the idea of reconst_model.fit(data, parallel=True) which activates the multiprocessing.

@@ -10,7 +10,7 @@
manager = None


def activate_multiprocessing(num_cpu=None):
def activate_multithreading(num_cpu=None):
"""

This comment has been minimized.

@skoudoro

skoudoro Jul 25, 2017

Member

Why did you rename it as multithreading since it is multiprocessing ? I think it is better to revert this. It will be more consistent with the doc

if _dipy_num_cpu > 1:
manager = PoolMananger(_dipy_num_cpu)
else:
manager = None


def deactivate_multiprocessing():
def deactivate_multithreading():
"""

This comment has been minimized.

@skoudoro

skoudoro Jul 25, 2017

Member

same as above

activate_multithreading(previous_state)


class PoolMananger(object):

This comment has been minimized.

@skoudoro

skoudoro Jul 25, 2017

Member

Typo : PoolManager

def task_done(self):
duration = time.time() - self.start
self.count += 1
msg = "task %d out of %d done in %s sec"

This comment has been minimized.

@skoudoro

skoudoro Jul 25, 2017

Member

Can you use "format" : msg = "task {0} out of {1} done in {2} sec".format(self.count, self.total, duration)



def _array_split_points(mask, num_chunks):
# TODO split input based on mask values so each thread gets about the same

This comment has been minimized.

@skoudoro

skoudoro Jul 25, 2017

Member

each thread or process ?

manager.shut_down()
# raise NotImplemented()
if _dipy_num_cpu > 1:
manager = PoolMananger(_dipy_num_cpu)

This comment has been minimized.

@skoudoro

skoudoro Jul 25, 2017

Member

typo: PoolManager


# pool = Pool(num_cpu)
if manger is None:
raise ValueError()

This comment has been minimized.

@skoudoro

skoudoro Jul 25, 2017

Member

I think you should explain what is the error.

it seems we need first to call activate_multiprocessing().

raise ValueError()
pool = manager.pool
if pool is None:
raise ValueError()

This comment has been minimized.

@skoudoro

skoudoro Jul 25, 2017

Member

same as above. I think you need to explain this impossible value: num_cpu > 1

brokenfunction = BrokenFunction()


def test_parallelFunction():

This comment has been minimized.

@skoudoro

skoudoro Jul 25, 2017

Member

PEP8: function should be lowercase

b = maxcumsum(data, mask, weights, return_cumsum=True)
deactivate_multithreading()
cumsum = (data * mask[..., None] * weights).cumsum(-1)
max = data.max(-1)

This comment has been minimized.

@skoudoro

skoudoro Jul 25, 2017

Member

max is a built-in function. Can you rename this variable ?

@skoudoro

This comment has been minimized.

Copy link
Member

skoudoro commented Sep 15, 2018

closing in favor of #1418

@skoudoro skoudoro closed this Sep 15, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment