forked from scikit-image/scikit-image
-
Notifications
You must be signed in to change notification settings - Fork 0
/
apply_parallel.py
211 lines (181 loc) · 8.13 KB
/
apply_parallel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
import numpy
from .._shared.utils import channel_as_last_axis, deprecate_multichannel_kwarg
__all__ = ['apply_parallel']
def _get_chunks(shape, ncpu):
"""Split the array into equal sized chunks based on the number of
available processors. The last chunk in each dimension absorbs the
remainder array elements if the number of CPUs does not divide evenly into
the number of array elements.
Examples
--------
>>> _get_chunks((4, 4), 4)
((2, 2), (2, 2))
>>> _get_chunks((4, 4), 2)
((2, 2), (4,))
>>> _get_chunks((5, 5), 2)
((2, 3), (5,))
>>> _get_chunks((2, 4), 2)
((1, 1), (4,))
"""
# since apply_parallel is in the critical import path, we lazy import
# math just when we need it.
from math import ceil
chunks = []
nchunks_per_dim = int(ceil(ncpu ** (1./len(shape))))
used_chunks = 1
for i in shape:
if used_chunks < ncpu:
regular_chunk = i // nchunks_per_dim
remainder_chunk = regular_chunk + (i % nchunks_per_dim)
if regular_chunk == 0:
chunk_lens = (remainder_chunk,)
else:
chunk_lens = ((regular_chunk,) * (nchunks_per_dim - 1) +
(remainder_chunk,))
else:
chunk_lens = (i,)
chunks.append(chunk_lens)
used_chunks *= nchunks_per_dim
return tuple(chunks)
def _ensure_dask_array(array, chunks=None):
import dask.array as da
if isinstance(array, da.Array):
return array
return da.from_array(array, chunks=chunks)
@deprecate_multichannel_kwarg()
def apply_parallel(function, array, chunks=None, depth=0, mode=None,
extra_arguments=(), extra_keywords={}, *, dtype=None,
compute=None, channel_axis=None,
multichannel=False):
"""Map a function in parallel across an array.
Split an array into possibly overlapping chunks of a given depth and
boundary type, call the given function in parallel on the chunks, combine
the chunks and return the resulting array.
Parameters
----------
function : function
Function to be mapped which takes an array as an argument.
array : numpy array or dask array
Array which the function will be applied to.
chunks : int, tuple, or tuple of tuples, optional
A single integer is interpreted as the length of one side of a square
chunk that should be tiled across the array. One tuple of length
``array.ndim`` represents the shape of a chunk, and it is tiled across
the array. A list of tuples of length ``ndim``, where each sub-tuple
is a sequence of chunk sizes along the corresponding dimension. If
None, the array is broken up into chunks based on the number of
available cpus. More information about chunks is in the documentation
`here <https://dask.pydata.org/en/latest/array-design.html>`_. When
`channel_axis` is not None, the tuples can be length ``ndim - 1`` and
a single chunk will be used along the channel axis.
depth : int or sequence of int, optional
The depth of the added boundary cells. A tuple can be used to specify a
different depth per array axis. Defaults to zero. When `channel_axis`
is not None, and a tuple of length ``ndim - 1`` is provided, a depth of
0 will be used along the channel axis.
mode : {'reflect', 'symmetric', 'periodic', 'wrap', 'nearest', 'edge'}, optional
Type of external boundary padding.
extra_arguments : tuple, optional
Tuple of arguments to be passed to the function.
extra_keywords : dictionary, optional
Dictionary of keyword arguments to be passed to the function.
dtype : data-type or None, optional
The data-type of the `function` output. If None, Dask will attempt to
infer this by calling the function on data of shape ``(1,) * ndim``.
For functions expecting RGB or multichannel data this may be
problematic. In such cases, the user should manually specify this dtype
argument instead.
.. versionadded:: 0.18
``dtype`` was added in 0.18.
compute : bool, optional
If ``True``, compute eagerly returning a NumPy Array.
If ``False``, compute lazily returning a Dask Array.
If ``None`` (default), compute based on array type provided
(eagerly for NumPy Arrays and lazily for Dask Arrays).
channel_axis : int or None, optional
If None, the image is assumed to be a grayscale (single channel) image.
Otherwise, this parameter indicates which axis of the array corresponds
to channels.
multichannel : bool, optional
If `chunks` is None and `multichannel` is True, this function will keep
only a single chunk along the channels axis. When `depth` is specified
as a scalar value, that depth will be applied only to the non-channels
axes (a depth of 0 will be used along the channels axis). If the user
manually specified both `chunks` and a `depth` tuple, then this
argument will have no effect. This argument is deprecated: specify
`channel_axis` instead.
.. versionadded:: 0.18
``multichannel`` was added in 0.18.
Returns
-------
out : ndarray or dask Array
Returns the result of the applying the operation.
Type is dependent on the ``compute`` argument.
Notes
-----
Numpy edge modes 'symmetric', 'wrap', and 'edge' are converted to the
equivalent ``dask`` boundary modes 'reflect', 'periodic' and 'nearest',
respectively.
Setting ``compute=False`` can be useful for chaining later operations.
For example region selection to preview a result or storing large data
to disk instead of loading in memory.
"""
try:
# Importing dask takes time. since apply_parallel is on the
# minimum import path of skimage, we lazy attempt to import dask
import dask.array as da
except ImportError:
raise RuntimeError("Could not import 'dask'. Please install "
"using 'pip install dask'")
if compute is None:
compute = not isinstance(array, da.Array)
if channel_axis is not None:
channel_axis = channel_axis % array.ndim
if chunks is None:
shape = array.shape
try:
# since apply_parallel is in the critical import path, we lazy
# import multiprocessing just when we need it.
from multiprocessing import cpu_count
ncpu = cpu_count()
except NotImplementedError:
ncpu = 4
if channel_axis is not None:
# use a single chunk along the channel axis
spatial_shape = shape[:channel_axis] + shape[channel_axis + 1:]
chunks = list(_get_chunks(spatial_shape, ncpu))
chunks.insert(channel_axis, shape[channel_axis])
chunks = tuple(chunks)
else:
chunks = _get_chunks(shape, ncpu)
elif channel_axis is not None and len(chunks) == array.ndim - 1:
# insert a single chunk along the channel axis
chunks = list(chunks)
chunks.insert(channel_axis, array.shape[channel_axis])
chunks = tuple(chunks)
if mode == 'wrap':
mode = 'periodic'
elif mode == 'symmetric':
mode = 'reflect'
elif mode == 'edge':
mode = 'nearest'
elif mode is None:
# default value for Dask.
# Note: that for dask >= 2022.03 it will change to 'none' so we set it
# here for consistent behavior across Dask versions.
mode = 'reflect'
if channel_axis is not None:
if numpy.isscalar(depth):
# depth is zero along channel_axis
depth = [depth] * (array.ndim - 1)
depth = list(depth)
if len(depth) == array.ndim - 1:
depth.insert(channel_axis, 0)
depth = tuple(depth)
def wrapped_func(arr):
return function(arr, *extra_arguments, **extra_keywords)
darr = _ensure_dask_array(array, chunks=chunks)
res = darr.map_overlap(wrapped_func, depth, boundary=mode, dtype=dtype)
if compute:
res = res.compute()
return res