Skip to content

Commit

Permalink
feat: cv.download_files (#532)
Browse files Browse the repository at this point in the history
* feat: cv.image.download_files

Download image files without rendering into a numpy array.

* feat(chunks): add remap operation

High efficiency for cseg and compresso

* wip: adding download_files to graphene

This will help with creating large agglomerated shards.

* wip: remap files directly

* fix: import and argument errors

* refactor: use download_chunks_threaded to ensure cache handling

* feat: add download_files to precomputed interface

* feat: add support for all options to download_files

* docs: show how to download files

* test: check output of download_files
  • Loading branch information
william-silversmith committed May 25, 2022
1 parent b802385 commit f1bea6c
Show file tree
Hide file tree
Showing 9 changed files with 355 additions and 14 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ vol[cfg.x: cfg.x + cfg.length, cfg.y:cfg.y + cfg.length, cfg.z: cfg.z + cfg.leng

### Examples

```python3
```python
# Basic Examples
vol = CloudVolume('gs://mybucket/retina/image')
vol = CloudVolume('gs://mybucket/retina/image', secrets=token, dict or json)
Expand All @@ -264,6 +264,8 @@ exists = vol.image.has_data(mip=0) # boolean check to see if any data is there
listing = vol.delete( np.s_[0:64, 0:128, 0:64] ) # delete this region (bbox must be chunk aligned)
vol[64:128, 64:128, 64:128] = image # Write a 64^3 image to the volume
img = vol.download_point( (x,y,z), size=256, mip=3 ) # download region around (mip 0) x,y,z at mip 3
# download image files without decompressing or rendering them. Good for caching!
files = vol.download_files(bbox, mip, decompress=False)

# Server
vol.viewer() # launches neuroglancer compatible web server on http://localhost:1337
Expand Down
27 changes: 24 additions & 3 deletions cloudvolume/chunks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Any, Optional, Sequence
from typing import Any, Optional, Sequence, Dict

import zlib
import io
Expand Down Expand Up @@ -253,9 +253,9 @@ def decode_compressed_segmentation_pure_python(bytestring, shape, dtype, block_s
return chunk.T

def labels(
filedata, encoding,
filedata:bytes, encoding:str,
shape=None, dtype=None,
block_size=None, background_color=0
block_size=None, background_color:int = 0
) -> np.ndarray:
"""
Extract unique labels from a chunk using
Expand All @@ -279,6 +279,27 @@ def labels(
else:
raise NotImplementedError(f"Encoding {encoding} is not supported. Try: raw, compressed_segmentation, or compresso.")

def remap(
filedata:bytes, encoding:str,
mapping:Dict[int,int],
preserve_missing_labels=False,
shape=None, dtype=None,
block_size=None
) -> bytes:
if filedata is None or len(filedata) == 0:
return filedata
elif encoding == "compressed_segmentation":
return cseg.remap(
filedata, shape, dtype, mapping,
preserve_missing_labels=preserve_missing_labels, block_size=block_size
)
elif encoding == "compresso":
return compresso.remap(filedata, mapping, preserve_missing_labels=preserve_missing_labels)
else:
img = decode(filedata, encoding, shape, dtype, block_size)
fastremap.remap(img, mapping, preserve_missing_labels=preserve_missing_labels, in_place=True)
return encode(img, encoding, block_size)

def read_voxel(
xyz:Sequence[int],
filedata:bytes,
Expand Down
2 changes: 1 addition & 1 deletion cloudvolume/datasource/precomputed/common.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
def content_type(encoding):
if encoding == 'jpeg':
return 'image/jpeg'
elif encoding in ('compressed_segmentation', 'fpzip', 'kempressed'):
elif encoding in ('compresso', 'compressed_segmentation', 'fpzip', 'kempressed'):
return 'image/x.' + encoding
return 'application/octet-stream'

Expand Down
56 changes: 54 additions & 2 deletions cloudvolume/datasource/precomputed/image/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def download(

if self.is_sharded(mip):
if renumber:
raise ValueError("renumber is only supported for non-shared volumes.")
raise ValueError("renumber is only supported for non-sharded volumes.")

scale = self.meta.scale(mip)
spec = sharding.ShardingSpecification.from_dict(scale['sharding'])
Expand Down Expand Up @@ -183,6 +183,58 @@ def download(
background_color=int(self.background_color),
)

def download_files(
self, bbox:Bbox, mip:int,
decompress:bool = True,
parallel:int = 1,
cache_only:bool = False
):
"""
Download the files that comprise a cutout image from the dataset
without rendering them into an image.
bbox: a Bbox object describing what region to download
mip: which resolution to fetch, 0 is the highest resolution
parallel: how many processes to use for downloading
cache_only: write downloaded files to cache and discard
the result to save memory
Returns:
If sharded:
{ morton_code: binary }
else:
{ path: binary }
"""
if self.autocrop:
bbox = Bbox.intersection(bbox, self.meta.bounds(mip))

self.check_bounded(bbox, mip)

if self.is_sharded(mip):
scale = self.meta.scale(mip)
spec = sharding.ShardingSpecification.from_dict(scale['sharding'])
return rx.download_raw_sharded(
bbox, mip,
self.meta, self.cache, spec,
decompress=decompress,
progress=self.config.progress,
)
else:
return rx.download_raw_unsharded(
bbox, mip,
meta=self.meta,
cache=self.cache,
decompress=decompress,
progress=self.config.progress,
parallel=parallel,
green=self.config.green,
secrets=self.config.secrets,
fill_missing=self.fill_missing,
compress_type=self.config.compress,
background_color=int(self.background_color),
cache_only=cache_only,
)

def unique(self, bbox:BboxLikeType, mip:int) -> set:
"""Extract unique values in an efficient way."""
bbox = Bbox.create(bbox, context=self.meta.bounds(mip))
Expand Down Expand Up @@ -445,7 +497,7 @@ def check(files):
for srcpaths in sip(cloudpaths, step):
files = check(cfsrc.get(srcpaths, raw=True))
cfdest.puts(
compression.transcode(files, encoding=compress, level=compress_level),
compression.transcode(files, encoding=compress, level=compress_level, in_place=True),
compress=compress,
content_type=tx.content_type(destvol),
raw=True
Expand Down
87 changes: 84 additions & 3 deletions cloudvolume/datasource/precomputed/image/rx.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,86 @@ def download_sharded(
requested_bbox
)

def download_raw_sharded(
requested_bbox, mip,
meta, cache, spec,
decompress, progress
):
"""
Download all the chunks without rendering.
"""
full_bbox = requested_bbox.expand_to_chunk_size(
meta.chunk_size(mip), offset=meta.voxel_offset(mip)
)
full_bbox = Bbox.clamp(full_bbox, meta.bounds(mip))

chunk_size = meta.chunk_size(mip)
grid_size = np.ceil(meta.bounds(mip).size3() / chunk_size).astype(np.uint32)

reader = sharding.ShardReader(meta, cache, spec)
bounds = meta.bounds(mip)

gpts = list(gridpoints(full_bbox, bounds, chunk_size))
morton_codes = compressed_morton_code(gpts, grid_size)
io_chunkdata = reader.get_data(
morton_codes, meta.key(mip),
progress=progress,
raw=(not decompress),
)
return io_chunkdata

def download_raw_unsharded(
requested_bbox, mip,
meta, cache,
decompress,
progress, parallel,
secrets, green, fill_missing,
compress_type, background_color,
cache_only
):
"""
Download all the chunks without rendering.
decompress: strip bytestream compression like gzip or br
leaving the image encoding untouched.
"""
full_bbox = requested_bbox.expand_to_chunk_size(
meta.chunk_size(mip), offset=meta.voxel_offset(mip)
)
full_bbox = Bbox.clamp(full_bbox, meta.bounds(mip))
cloudpaths = chunknames(
full_bbox, meta.bounds(mip),
meta.key(mip), meta.chunk_size(mip),
protocol=meta.path.protocol
)

results = {}
def store_result(binary, bbox):
nonlocal results
if cache_only:
return
key = meta.join(meta.key(mip), bbox.to_filename())
results[key] = binary

def noop_decode(
meta, input_bbox,
content, fill_missing,
mip, background_color=0
):
return content

compress_cache = should_compress(meta.encoding(mip), compress_type, cache, iscache=True)

download_chunks_threaded(
meta, cache,
lru=None, mip=mip, cloudpaths=cloudpaths,
fn=store_result, decode_fn=noop_decode, fill_missing=fill_missing,
progress=progress, compress_cache=compress_cache,
green=green, secrets=secrets, background_color=background_color
)

return results

def download(
requested_bbox, mip,
meta, cache, lru,
Expand Down Expand Up @@ -399,7 +479,7 @@ def download_chunk(
filename, fill_missing,
enable_cache, compress_cache,
secrets, background_color,
decode_fn
decode_fn, decompress=True
):
if lru is not None and filename in lru:
content = lru[filename]
Expand All @@ -418,7 +498,7 @@ def download_chunk(
)
del cache_content

if content is not None:
if content is not None and decompress:
content = compression.decompress(content, file['compress'])

if lru is not None:
Expand All @@ -433,6 +513,7 @@ def download_chunks_threaded(
meta, cache, lru, mip, cloudpaths, fn, decode_fn,
fill_missing, progress, compress_cache,
green=False, secrets=None, background_color=0,
decompress=True,
):
"""fn is the postprocess callback. decode_fn is a decode fn."""
locations = cache.compute_data_locations(cloudpaths)
Expand All @@ -444,7 +525,7 @@ def process(cloudpath, filename, enable_cache):
filename, fill_missing,
enable_cache, compress_cache,
secrets, background_color,
decode_fn
decode_fn, decompress
)
fn(labels, bbox)

Expand Down
10 changes: 7 additions & 3 deletions cloudvolume/datasource/precomputed/sharding.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

from collections import namedtuple, defaultdict
import copy
import json
Expand Down Expand Up @@ -540,15 +542,17 @@ def disassemble_shard(self, shard):
return shattered

def get_data(
self, label, path="",
progress=None, parallel=1
self, label:int, path:str = "",
progress:Optional[bool] = None, parallel:int = 1,
raw:bool = False
):
"""Fetches data from shards.
label: one or more segment ids
path: subdirectory path
progress: display progress bars
parallel: (int >= 0) use multiple processes
raw: if true, don't decompress or decode stream
Return:
if label is a scalar:
Expand Down Expand Up @@ -635,7 +639,7 @@ def get_data(
del bundles
del bundles_resp

if self.spec.data_encoding != 'raw':
if not raw and self.spec.data_encoding != 'raw':
for filepath, binary in tqdm(binaries.items(), desc="Decompressing", disable=(not progress)):
if binary is None:
continue
Expand Down
Loading

0 comments on commit f1bea6c

Please sign in to comment.