From 86aca36701e5b0cbdc3775be275d5aaab5a9c959 Mon Sep 17 00:00:00 2001 From: Jordan Matelsky Date: Mon, 27 Jul 2020 14:17:43 -0400 Subject: [PATCH 1/5] Add parallelism to BossRemote#create_cutout --- CHANGELOG.md | 8 ++++++ intern/__init__.py | 2 +- intern/remote/cv/remote.py | 2 +- intern/remote/remote.py | 7 +++-- intern/service/boss/v1/volume.py | 49 ++++++++++++++++++++++---------- 5 files changed, 48 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ca49fe7..3250280e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,14 @@ --- +## v1.2.0 + +> This release adds support for parallel data uploads through `BossRemote#create_cutout`. + +- Remotes + - `BossRemote` + - Parallel data upload with `BossRemote#create_cutout` + ## v1.1.0 — July 27 2020 > Updates to the convenience API. diff --git a/intern/__init__.py b/intern/__init__.py index f6addb01..63b28bdb 100644 --- a/intern/__init__.py +++ b/intern/__init__.py @@ -4,7 +4,7 @@ from .convenience import array -__version__ = "1.0.0" +__version__ = "1.2.0" def check_version(): diff --git a/intern/remote/cv/remote.py b/intern/remote/cv/remote.py index 0fa73346..3406bb14 100644 --- a/intern/remote/cv/remote.py +++ b/intern/remote/cv/remote.py @@ -122,7 +122,7 @@ def create_cutout(self, resource, res, x_range, y_range, z_range, data): vol (CloudVolume) : Existing cloudvolume instance x_range (list) : x range within the 3D space y_range (list) : y range within the 3D space - z_range (list) : z range witinn the 3D space + z_range (list) : z range within the 3D space Retruns: message (str) : Uploading Data... message """ diff --git a/intern/remote/remote.py b/intern/remote/remote.py index 81ddacb7..bbf33ec7 100644 --- a/intern/remote/remote.py +++ b/intern/remote/remote.py @@ -175,10 +175,10 @@ def get_cutout(self, resource, resolution, x_range, y_range, z_range, time_range return self._volume.get_cutout( resource, resolution, x_range, y_range, z_range, time_range, - id_list, parallel = parallel, **kwargs + id_list, parallel=parallel, **kwargs ) - def create_cutout(self, resource, resolution, x_range, y_range, z_range, data, time_range=None): + def create_cutout(self, resource, resolution, x_range, y_range, z_range, data, time_range=None, parallel=True): """Upload a cutout to the volume service. Args: @@ -189,6 +189,7 @@ def create_cutout(self, resource, resolution, x_range, y_range, z_range, data, t z_range (list[int]): z range such as [10, 20] which means z>=10 and z<20. data (object): Type depends on implementation. time_range (optional [list[int]]): time range such as [30, 40] which means t>=30 and t<40. + parallel (Union[bool, int]: True): Parallel upload count, or True/False to use/avoid parallelism. Returns: (): Return type depends on volume service's implementation. @@ -200,7 +201,7 @@ def create_cutout(self, resource, resolution, x_range, y_range, z_range, data, t if not resource.valid_volume(): raise RuntimeError('Resource incompatible with the volume service.') return self._volume.create_cutout( - resource, resolution, x_range, y_range, z_range, data, time_range) + resource, resolution, x_range, y_range, z_range, data, time_range, parallel=parallel) def reserve_ids(self, resource, num_ids): """Reserve a block of unique, sequential ids for annotations. diff --git a/intern/service/boss/v1/volume.py b/intern/service/boss/v1/volume.py index 40c51e41..cb864397 100644 --- a/intern/service/boss/v1/volume.py +++ b/intern/service/boss/v1/volume.py @@ -49,7 +49,7 @@ def get_bit_width(self, resource): def create_cutout( self, resource, resolution, x_range, y_range, z_range, time_range, numpyVolume, - url_prefix, auth, session, send_opts): + url_prefix, auth, session, send_opts, parallel=True): """Upload a cutout to the Boss data store. Args: @@ -64,6 +64,7 @@ def create_cutout( auth (string): Token to send in the request header. session (requests.Session): HTTP session to use for request. send_opts (dictionary): Additional arguments to pass to session.send(). + parallel (Union[bool, int]: True): Parallel upload count, or True/False to use/avoid parallelism. """ if numpyVolume.ndim == 3: @@ -96,20 +97,38 @@ def create_cutout( block_size=(1024, 1024, 32) ) - for b in blocks: - _data = np.ascontiguousarray( - numpyVolume[ - b[2][0] - z_range[0]: b[2][1] - z_range[0], - b[1][0] - y_range[0]: b[1][1] - y_range[0], - b[0][0] - x_range[0]: b[0][1] - x_range[0] - ], - dtype=numpyVolume.dtype - ) - self.create_cutout( - resource, resolution, b[0], b[1], b[2], - time_range, _data, url_prefix, auth, session, send_opts - ) - return + if parallel: + pool = multiprocessing.Pool(processes=parallel if isinstance(parallel, int) and parallel > 0 else multiprocessing.cpu_count()) + pool.starmap(self.create_cutout, [ + ( + resource, resolution, b[0], b[1], b[2], + time_range, + np.ascontiguousarray( + numpyVolume[ + b[2][0] - z_range[0]: b[2][1] - z_range[0], + b[1][0] - y_range[0]: b[1][1] - y_range[0], + b[0][0] - x_range[0]: b[0][1] - x_range[0] + ], + dtype=numpyVolume.dtype + ), + url_prefix, auth, session, send_opts, + ) + for b in blocks]) + else: + for b in blocks: + _data = np.ascontiguousarray( + numpyVolume[ + b[2][0] - z_range[0]: b[2][1] - z_range[0], + b[1][0] - y_range[0]: b[1][1] - y_range[0], + b[0][0] - x_range[0]: b[0][1] - x_range[0] + ], + dtype=numpyVolume.dtype + ) + self.create_cutout( + resource, resolution, b[0], b[1], b[2], + time_range, _data, url_prefix, auth, session, send_opts + ) + return compressed = blosc.compress( numpyVolume, typesize=self.get_bit_width(resource) From 13fe8b479aae6a245052eb6ccc2a0d526b34eed8 Mon Sep 17 00:00:00 2001 From: Jordan Matelsky Date: Mon, 27 Jul 2020 14:37:13 -0400 Subject: [PATCH 2/5] Add kwargs passthrough to create_cutout --- intern/service/boss/volume.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/intern/service/boss/volume.py b/intern/service/boss/volume.py index 2b38413c..4d6a0584 100644 --- a/intern/service/boss/volume.py +++ b/intern/service/boss/volume.py @@ -62,7 +62,7 @@ def __init__(self, base_url, version): @check_channel def create_cutout( - self, resource, resolution, x_range, y_range, z_range, numpyVolume, time_range=None): + self, resource, resolution, x_range, y_range, z_range, numpyVolume, time_range=None, **kwargs): """Upload a cutout to the volume service. Args: @@ -78,7 +78,7 @@ def create_cutout( return self.service.create_cutout( resource, resolution, x_range, y_range, z_range, time_range, numpyVolume, - self.url_prefix, self.auth, self.session, self.session_send_opts) + self.url_prefix, self.auth, self.session, self.session_send_opts, **kwargs) @check_channel def get_cutout(self, resource, resolution, x_range, y_range, z_range, time_range=None, id_list=[], access_mode=CacheMode.no_cache, **kwargs): From b76a6303c99158294af348b1096eed6988884b50 Mon Sep 17 00:00:00 2001 From: Jordan Matelsky Date: Wed, 29 Jul 2020 23:43:44 -0400 Subject: [PATCH 3/5] Update CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3250280e..43528d6d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - Remotes - `BossRemote` - Parallel data upload with `BossRemote#create_cutout` + - `create_cutout` ignores all-zero data uploads for better performance and lower network IO ## v1.1.0 — July 27 2020 From 4e5392b80f50117572888ac666ce25755bcb30ae Mon Sep 17 00:00:00 2001 From: Jordan Matelsky Date: Sun, 21 Mar 2021 17:21:48 -0400 Subject: [PATCH 4/5] Pull array construction into a helper function --- intern/service/boss/v1/volume.py | 44 ++++++++++++++++++++------------ 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/intern/service/boss/v1/volume.py b/intern/service/boss/v1/volume.py index 192764ae..cadefe0c 100644 --- a/intern/service/boss/v1/volume.py +++ b/intern/service/boss/v1/volume.py @@ -69,6 +69,20 @@ def create_cutout( if np.sum(numpyVolume) == 0: return + if parallel: + # Parallel uploads are faster with a smaller chunk size but can easily overwhelm + # the endpoint if its too small. Therefore from empirical testing (512, 512, 96) + # USUALLY is the fastest. There is some variabiity on number of threads. + chunk_size = (512, 512, 16 * 6) + else: + # Single thread uploads are faster with a large chunk size, but can't surpass + # 500 MB limit. To stay within 500 MB constraint with 64-bit data, we chose a + # chunk size of (512, 512, 192) which is about 402 MB. + chunk_size = (512, 512, 16 * 12) + + # TODO: magic number + chunk_limit = (chunk_size[0] * chunk_size[1] * chunk_size[2]) * 1.2 + if numpyVolume.ndim == 3: # Can't have time if time_range is not None: @@ -99,33 +113,31 @@ def create_cutout( block_size=(1024, 1024, 32) ) + def _create_new_contiguous_array(b, z_range, y_range, x_range): + # Create a new C ordered numpy array. Used below to construct + # each block. + return np.ascontiguousarray( + numpyVolume[ + b[2][0] - z_range[0]: b[2][1] - z_range[0], + b[1][0] - y_range[0]: b[1][1] - y_range[0], + b[0][0] - x_range[0]: b[0][1] - x_range[0] + ], + dtype=numpyVolume.dtype + ) + if parallel: pool = multiprocessing.Pool(processes=parallel if isinstance(parallel, int) and parallel > 0 else multiprocessing.cpu_count()) pool.starmap(self.create_cutout, [ ( resource, resolution, b[0], b[1], b[2], time_range, - np.ascontiguousarray( - numpyVolume[ - b[2][0] - z_range[0]: b[2][1] - z_range[0], - b[1][0] - y_range[0]: b[1][1] - y_range[0], - b[0][0] - x_range[0]: b[0][1] - x_range[0] - ], - dtype=numpyVolume.dtype - ), + _create_new_contiguous_array(b, z_range, y_range, x_range), url_prefix, auth, session, send_opts, ) for b in blocks]) else: for b in blocks: - _data = np.ascontiguousarray( - numpyVolume[ - b[2][0] - z_range[0]: b[2][1] - z_range[0], - b[1][0] - y_range[0]: b[1][1] - y_range[0], - b[0][0] - x_range[0]: b[0][1] - x_range[0] - ], - dtype=numpyVolume.dtype - ) + _data = _create_new_contiguous_array(b, z_range, y_range, x_range) self.create_cutout( resource, resolution, b[0], b[1], b[2], time_range, _data, url_prefix, auth, session, send_opts From e6d77f09530c5e35a1306c5e6aca3a7af8051c66 Mon Sep 17 00:00:00 2001 From: Jordan Matelsky Date: Sun, 21 Mar 2021 17:23:13 -0400 Subject: [PATCH 5/5] Temporarily remove upload chunk size --- intern/service/boss/v1/volume.py | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/intern/service/boss/v1/volume.py b/intern/service/boss/v1/volume.py index cadefe0c..db076274 100644 --- a/intern/service/boss/v1/volume.py +++ b/intern/service/boss/v1/volume.py @@ -69,20 +69,6 @@ def create_cutout( if np.sum(numpyVolume) == 0: return - if parallel: - # Parallel uploads are faster with a smaller chunk size but can easily overwhelm - # the endpoint if its too small. Therefore from empirical testing (512, 512, 96) - # USUALLY is the fastest. There is some variabiity on number of threads. - chunk_size = (512, 512, 16 * 6) - else: - # Single thread uploads are faster with a large chunk size, but can't surpass - # 500 MB limit. To stay within 500 MB constraint with 64-bit data, we chose a - # chunk size of (512, 512, 192) which is about 402 MB. - chunk_size = (512, 512, 16 * 12) - - # TODO: magic number - chunk_limit = (chunk_size[0] * chunk_size[1] * chunk_size[2]) * 1.2 - if numpyVolume.ndim == 3: # Can't have time if time_range is not None: