Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP : Multiprocessing - implemented a parallel_voxel_fit decorator #1135

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions dipy/multi/__init__.py
@@ -0,0 +1 @@
# init for the parallel framework
80 changes: 80 additions & 0 deletions dipy/multi/config.py
@@ -0,0 +1,80 @@
"""
Configuration for multiprocessing
"""
from multiprocessing import cpu_count, Pool
from contextlib import contextmanager
import atexit


_dipy_num_cpu = 1
manager = None


def activate_multithreading(num_cpu=None):
"""
Function to activate multiprocessing.

Parameters
----------
num_cpu : int
Number of CPU's.
default: None
"""
global _dipy_num_cpu
global manager
if num_cpu is None:
_dipy_num_cpu = cpu_count()
elif num_cpu <= 0:
raise ValueError("num_cpu must be positive")
else:
_dipy_num_cpu = num_cpu

if manager is not None:
manager.shut_down()
# raise NotImplemented()
if _dipy_num_cpu > 1:
manager = PoolMananger(_dipy_num_cpu)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: PoolManager

else:
manager = None


def deactivate_multithreading():
"""
Function to deactivate multiprocessing.
"""
global _dipy_num_cpu
_dipy_num_cpu = 1


@contextmanager
def multithreading_on(num_cpu=None):
previous_state = _dipy_num_cpu
activate_multithreading(num_cpu)
try:
yield
finally:
activate_multithreading(previous_state)


class PoolMananger(object):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo : PoolManager


active = False

def __init__(self, n_cpu):
self.n_cpu = n_cpu
self.pool = Pool(n_cpu)
self.active = True

def shut_down(self):
if self.active:
self.pool.close()
self.pool.join()
self.active = False
self.pool = None


def cleanup():
if manager is not None:
manager.shut_down()

atexit.register(cleanup)
229 changes: 229 additions & 0 deletions dipy/multi/parallel.py
@@ -0,0 +1,229 @@
"""Classes and functions to implement multiprocessing."""
from __future__ import division

from abc import abstractmethod
import numpy as np

from dipy.multi.config import _dipy_num_cpu, manager
from dipy.reconst.multi_voxel import MultiVoxelFit


def update_outputs(index, result, outputs):
for key, array in outputs.items():
array[index] = result[key]


class MultiVoxelFunction(object):
"""A function that can be applied to every voxel of a dMRI data set.
Subclass MultiVoxelFunction and define the _main and _default_values
methods to create a subclass. Both methods should return a dictionary of
outputs, where the keys are the names of the outputs and the values are
arrays. _main will be used in voxels where mask is true, and the result
from _default_values will be used in voxels where mask is False.
"""

@abstractmethod
def _main(self, single_voxel_data, *args, **kwargs):
raise NotImplementedError("Implement this method in a subclass.")

@abstractmethod
def _default_values(self, data, mask, *args, **kwargs):
raise NotImplementedError("Implement this method in a subclass.")

def _setup_outputs(self, data, mask, *args, **kwargs):
default_values = self._default_values(data, mask, *args, **kwargs)
outputs = {}
shape = mask.shape
ndim = len(shape)
for key, array in default_values.items():
out_array = np.empty(shape + array.shape, array.dtype)
out_array[...] = array
outputs[key] = out_array
return outputs

def _serial(self, data, mask, *args, **kwargs):
outputs = self._setup_outputs(data, mask, *args, **kwargs)
for ijk in np.ndindex(mask.shape):
if not mask[ijk]:
continue
vox = data[ijk]
result = self._main(vox, *args, **kwargs)
update_outputs(ijk, result, outputs)
return outputs

def __call__(self, data, mask, *args, **kwargs):
self._serial(self, data, mask, *args, **kwargs)


class TrackProgress(object):

def __init__(self, n):
self.count = 0
self.total = n
self.reset()

def reset(self):
self.start = time.time()

def task_done(self):
duration = time.time() - self.start
self.count += 1
msg = "task %d out of %d done in %s sec"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use "format" : msg = "task {0} out of {1} done in {2} sec".format(self.count, self.total, duration)

msg = msg % (self.count, self.total, duration)
print(msg)


class UpdateCallback(MultiVoxelFunction):

def __init__(self, index, outputs, errors, tracker):
self.index = index
self.outputs = outputs
self.errors = errors
self.tracker = tracker

def __call__(self, result):
update_outputs(self.index, result, self.outputs)
# Remove from errors to indicate successful completion and free memory.
self.errors.pop(repr(self.index))
self.tracker.task_done()


def _array_split_points(mask, num_chunks):
# TODO split input based on mask values so each thread gets about the same
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

each thread or process ?

# number of voxels where mask is True.
cumsum = mask.cumsum()
num_chunks = min(num_chunks, cumsum[-1])
even_spacing = np.linspace(1, cumsum[-1], num_chunks + 1)

split_points = cumsum.searchsorted(even_spacing, 'left')
split_points[-1] += 1
assert (split_points[-1] == len(cumsum) or
cumsum[split_points[-1]] == cumsum[-1])
return split_points


def call_remotely(parallel_function, *args, **kwargs):
return parallel_function._serial(*args, **kwargs)


class ParallelFunction(MultiVoxelFunction):

def _parallel(self, data, mask, *args, **kwargs):
ndim = mask.ndim
shape = mask.shape
if data.shape[:ndim] != shape:
raise ValueError("mask and data shapes do not match")

# Flatten the inputs
data = data.reshape((-1,) + data.shape[ndim:])
mask = mask.ravel()
size = mask.size
outputs = self._setup_outputs(data, mask, *args, **kwargs)

num_cpu = _dipy_num_cpu
num_chunks = 100 * num_cpu
split_points = _array_split_points(mask, num_chunks)
start = split_points[0]
end_points = split_points[1:]

# pool = Pool(num_cpu)
if manger is None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: manager

raise ValueError()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should explain what is the error.

it seems we need first to call activate_multiprocessing().

pool = manager.pool
if pool is None:
raise ValueError()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above. I think you need to explain this impossible value: num_cpu > 1


errors = {}
tracker = TrackProgress(len(split_points))
for end in end_points:
index = slice(start, end)
chunk_args = (self, data[index], mask[index]) + args
callback = UpdateCallback(index, outputs, errors, tracker)
r = pool.apply_async(call_remotely, chunk_args, kwargs,
callback=callback)
# As of python 2.7, the async_result is the only way to track
# errors, in python 3 an error callback can be used.
errors[repr(index)] = r
start = end

del r
pool_done = False

while not pool_done:
time.sleep(3)
result = list(errors.values())
pool_done = all(r.ready() for r in result)

if errors:
index, r = errors.popitem()
# If errors is not empty, callbacks did not all execute, r.get()
# should raise an error.
r.get()
# If r.get() does not raise an error, something else went wrong.
msg = "Parallel function did not execute successfully."
raise RuntimeError(msg)

# Un-ravel the outputs
for key in outputs:
array = outputs[key]
outputs[key] = array.reshape(shape + array.shape[1:])

return outputs

def __call__(self, data, mask, *args, **kwargs):
if _dipy_num_cpu == 1:
return self._serial(data, mask, *args, **kwargs)
else:
return self._parallel(data, mask, *args, **kwargs)


class ParallelFit(ParallelFunction):

def _main(self, single_voxel_data, model=None):
"""Fit method for every voxel in data"""
fit = model.fit(single_voxel_data)
return {"fit_array": fit}

def _default_values(self, data, mask, model=None):
return {"fit_array": np.array(None)}


parallel_fit = ParallelFit()


def parallel_voxel_fit(single_voxel_fit):
"""Wraps single_voxel_fit method to turn a model into a multi voxel model.
Use this decorator on the fit method of your model to take advantage of the
MultiVoxelFit and ParallelFit machinery of dipy.
Parameters:
-----------
single_voxel_fit : callable
Should have a signature like: model [self when a model method is being
wrapped], data [single voxel data].
Returns:
--------
multi_voxel_fit_function : callable
Examples:
---------
>>> import numpy as np
>>> from dipy.reconst.base import ReconstModel, ReconstFit
>>> class Model(ReconstModel):
... @parallel_voxel_fit
... def fit(self, single_voxel_data):
... return ReconstFit(self, single_voxel_data.sum())
>>> model = Model(None)
>>> data = np.random.random((2, 3, 4, 5))
>>> fit = model.fit(data)
>>> np.allclose(fit.data, data.sum(-1))
True
"""

def new_fit(model, data, mask=None):
if data.ndim == 1:
return single_voxel_fit(model, data)
if mask is None:
mask = np.ones(data.shape[:-1], bool)
fit = parallel_fit(data, mask, model=model)
return MultiVoxelFit(model, fit["fit_array"], mask)

return new_fit