Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parallelism to BossRemote#create_cutout #64

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

---

## v1.2.0

> This release adds support for parallel data uploads through `BossRemote#create_cutout`.

- Remotes
- `BossRemote`
- Parallel data upload with `BossRemote#create_cutout`

## v1.1.0 — July 27 2020

> Updates to the convenience API.
Expand Down
2 changes: 1 addition & 1 deletion intern/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from .convenience import array

__version__ = "1.0.0"
__version__ = "1.2.0"


def check_version():
Expand Down
2 changes: 1 addition & 1 deletion intern/remote/cv/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def create_cutout(self, resource, res, x_range, y_range, z_range, data):
vol (CloudVolume) : Existing cloudvolume instance
x_range (list) : x range within the 3D space
y_range (list) : y range within the 3D space
z_range (list) : z range witinn the 3D space
z_range (list) : z range within the 3D space
Retruns:
message (str) : Uploading Data... message
"""
Expand Down
7 changes: 4 additions & 3 deletions intern/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,10 @@ def get_cutout(self, resource, resolution, x_range, y_range, z_range, time_range
return self._volume.get_cutout(
resource, resolution,
x_range, y_range, z_range, time_range,
id_list, parallel = parallel, **kwargs
id_list, parallel=parallel, **kwargs
)

def create_cutout(self, resource, resolution, x_range, y_range, z_range, data, time_range=None):
def create_cutout(self, resource, resolution, x_range, y_range, z_range, data, time_range=None, parallel=True):
"""Upload a cutout to the volume service.

Args:
Expand All @@ -189,6 +189,7 @@ def create_cutout(self, resource, resolution, x_range, y_range, z_range, data, t
z_range (list[int]): z range such as [10, 20] which means z>=10 and z<20.
data (object): Type depends on implementation.
time_range (optional [list[int]]): time range such as [30, 40] which means t>=30 and t<40.
parallel (Union[bool, int]: True): Parallel upload count, or True/False to use/avoid parallelism.

Returns:
(): Return type depends on volume service's implementation.
Expand All @@ -200,7 +201,7 @@ def create_cutout(self, resource, resolution, x_range, y_range, z_range, data, t
if not resource.valid_volume():
raise RuntimeError('Resource incompatible with the volume service.')
return self._volume.create_cutout(
resource, resolution, x_range, y_range, z_range, data, time_range)
resource, resolution, x_range, y_range, z_range, data, time_range, parallel=parallel)

def reserve_ids(self, resource, num_ids):
"""Reserve a block of unique, sequential ids for annotations.
Expand Down
49 changes: 34 additions & 15 deletions intern/service/boss/v1/volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def get_bit_width(self, resource):

def create_cutout(
self, resource, resolution, x_range, y_range, z_range, time_range, numpyVolume,
url_prefix, auth, session, send_opts):
url_prefix, auth, session, send_opts, parallel=True):
"""Upload a cutout to the Boss data store.

Args:
Expand All @@ -64,6 +64,7 @@ def create_cutout(
auth (string): Token to send in the request header.
session (requests.Session): HTTP session to use for request.
send_opts (dictionary): Additional arguments to pass to session.send().
parallel (Union[bool, int]: True): Parallel upload count, or True/False to use/avoid parallelism.
"""

if numpyVolume.ndim == 3:
Expand Down Expand Up @@ -96,20 +97,38 @@ def create_cutout(
block_size=(1024, 1024, 32)
)

for b in blocks:
_data = np.ascontiguousarray(
numpyVolume[
b[2][0] - z_range[0]: b[2][1] - z_range[0],
b[1][0] - y_range[0]: b[1][1] - y_range[0],
b[0][0] - x_range[0]: b[0][1] - x_range[0]
],
dtype=numpyVolume.dtype
)
self.create_cutout(
resource, resolution, b[0], b[1], b[2],
time_range, _data, url_prefix, auth, session, send_opts
)
return
if parallel:
pool = multiprocessing.Pool(processes=parallel if isinstance(parallel, int) and parallel > 0 else multiprocessing.cpu_count())
pool.starmap(self.create_cutout, [
(
resource, resolution, b[0], b[1], b[2],
time_range,
np.ascontiguousarray(
numpyVolume[
b[2][0] - z_range[0]: b[2][1] - z_range[0],
b[1][0] - y_range[0]: b[1][1] - y_range[0],
b[0][0] - x_range[0]: b[0][1] - x_range[0]
],
dtype=numpyVolume.dtype
),
url_prefix, auth, session, send_opts,
)
for b in blocks])
else:
for b in blocks:
_data = np.ascontiguousarray(
movestill marked this conversation as resolved.
Show resolved Hide resolved
numpyVolume[
b[2][0] - z_range[0]: b[2][1] - z_range[0],
b[1][0] - y_range[0]: b[1][1] - y_range[0],
b[0][0] - x_range[0]: b[0][1] - x_range[0]
],
dtype=numpyVolume.dtype
)
self.create_cutout(
resource, resolution, b[0], b[1], b[2],
time_range, _data, url_prefix, auth, session, send_opts
)
return

compressed = blosc.compress(
numpyVolume, typesize=self.get_bit_width(resource)
Expand Down
4 changes: 2 additions & 2 deletions intern/service/boss/volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __init__(self, base_url, version):

@check_channel
def create_cutout(
self, resource, resolution, x_range, y_range, z_range, numpyVolume, time_range=None):
self, resource, resolution, x_range, y_range, z_range, numpyVolume, time_range=None, **kwargs):
"""Upload a cutout to the volume service.

Args:
Expand All @@ -78,7 +78,7 @@ def create_cutout(

return self.service.create_cutout(
resource, resolution, x_range, y_range, z_range, time_range, numpyVolume,
self.url_prefix, self.auth, self.session, self.session_send_opts)
self.url_prefix, self.auth, self.session, self.session_send_opts, **kwargs)

@check_channel
def get_cutout(self, resource, resolution, x_range, y_range, z_range, time_range=None, id_list=[], access_mode=CacheMode.no_cache, **kwargs):
Expand Down