Skip to content

Commit

Permalink
Gevent (#174)
Browse files Browse the repository at this point in the history
* move cloudvolume object to function, do not keep them in memory

* tests works be removing the gevent feature

* add gevent monkey patch in the package init function since it is required to be setup before any other packages!
  • Loading branch information
jingpengw committed Apr 7, 2020
1 parent bbceda7 commit 62ce90b
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 80 deletions.
4 changes: 3 additions & 1 deletion chunkflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-


# Using Green Threads
import gevent.monkey
gevent.monkey.patch_all(thread=False)
51 changes: 26 additions & 25 deletions chunkflow/flow/cutout.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,21 @@ def __init__(self,
self.blackout_sections = blackout_sections
self.dry_run = dry_run

self.vol = CloudVolume(self.volume_path,
bounded=False,
fill_missing=self.fill_missing,
progress=self.verbose,
mip=self.mip,
cache=False,
parallel=1)

if blackout_sections:
with Storage(volume_path) as stor:
self.blackout_section_ids = stor.get_json(
'blackout_section_ids.json')['section_ids']

if self.validate_mip:
self.validate_vol = CloudVolume(self.volume_path,
bounded=False,
fill_missing=self.fill_missing,
progress=self.verbose,
mip=self.validate_mip,
cache=False,
parallel=1)

def __call__(self, output_bbox):

#gevent.monkey.patch_all(thread=False)
vol = CloudVolume(self.volume_path,
bounded=False,
fill_missing=self.fill_missing,
progress=self.verbose,
mip=self.mip,
cache=False,
green_threads=True)

chunk_slices = tuple(
slice(s.start - m, s.stop + m)
for s, m in zip(output_bbox.to_slices(), self.expand_margin_size))
Expand All @@ -66,7 +57,7 @@ def __call__(self, output_bbox):
self.volume_path))

# always reverse the indexes since cloudvolume use x,y,z indexing
chunk = self.vol[chunk_slices[::-1]]
chunk = vol[chunk_slices[::-1]]
# the cutout is fortran ordered, so need to transpose and make it C order
chunk = chunk.transpose()
# we can delay this transpose later
Expand All @@ -89,7 +80,8 @@ def __call__(self, output_bbox):
chunk = self._blackout_sections(chunk)

if self.validate_mip:
self._validate_chunk(chunk)
self._validate_chunk(chunk, vol)

return chunk

def _blackout_sections(self, chunk):
Expand All @@ -108,14 +100,23 @@ def _blackout_sections(self, chunk):
chunk[z0, :, :] = 0
return chunk

def _validate_chunk(self, chunk):
def _validate_chunk(self, chunk, vol):
"""
check that all the input voxels was downloaded without black region
We have found some black regions in previous inference run,
so hopefully this will solve the problem.
"""
if chunk.ndim == 4 and chunk.shape[0] > 1:
chunk = chunk[0, :, :, :]

validate_vol = CloudVolume(self.volume_path,
bounded=False,
fill_missing=self.fill_missing,
progress=self.verbose,
mip=self.validate_mip,
cache=False,
green_threads=True)


chunk_mip = self.mip
if self.verbose:
Expand All @@ -134,19 +135,19 @@ def _validate_chunk(self, chunk):
],
dtype=np.int32)
clamped_offset = tuple(go + f - (go - vo) % f for go, vo, f in zip(
global_offset[::-1], self.vol.voxel_offset, factor3))
global_offset[::-1], vol.voxel_offset, factor3))
clamped_stop = tuple(
go + s - (go + s - vo) % f
for go, s, vo, f in zip(global_offset[::-1], chunk.shape[::-1],
self.vol.voxel_offset, factor3))
vol.voxel_offset, factor3))
clamped_slices = tuple(
slice(o, s) for o, s in zip(clamped_offset, clamped_stop))
clamped_bbox = Bbox.from_slices(clamped_slices)
clamped_input = chunk.cutout(clamped_slices[::-1])
# transform to xyz order
clamped_input = np.transpose(clamped_input)
# get the corresponding bounding box for validation
validate_bbox = self.vol.bbox_to_mip(clamped_bbox,
validate_bbox = vol.bbox_to_mip(clamped_bbox,
mip=chunk_mip,
to_mip=self.validate_mip)
#validate_bbox = clamped_bbox // factor3
Expand All @@ -162,7 +163,7 @@ def _validate_chunk(self, chunk):
# validation by template matching
assert validate_by_template_matching(clamped_input)

validate_input = self.validate_vol[validate_bbox.to_slices()]
validate_input = validate_vol[validate_bbox.to_slices()]
if validate_input.shape[3] == 1:
validate_input = np.squeeze(validate_input, axis=3)

Expand Down
2 changes: 1 addition & 1 deletion chunkflow/flow/downsample_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import numpy as np
from cloudvolume.lib import Bbox


class DownsampleUploadOperator(OperatorBase):
"""
Multiple mip level downsampling including image and segmenation.
Expand Down Expand Up @@ -44,6 +43,7 @@ def __init__(self,
bounded=False,
autocrop=True,
mip=mip,
green_threads=True,
progress=verbose)

self.vols = vols
Expand Down
7 changes: 1 addition & 6 deletions chunkflow/flow/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1252,21 +1252,16 @@ def quantize(tasks, name, input_chunk_name, output_chunk_name):
type=str, default=DEFAULT_CHUNK_NAME, help='input chunk name')
@click.option('--upload-log/--no-upload-log',
default=True, help='the log will be put inside volume-path')
@click.option('--nproc', '-p',
type=int, default=1,
help='number of processes, negative means using all the cores, ' +
'0/1 means turning off multiple processing, n>1 means using n processes')
@click.option('--create-thumbnail/--no-create-thumbnail',
default=False, help='create thumbnail or not. ' +
'the thumbnail is a downsampled and quantized version of the chunk.')
@operator
def save(tasks, name, volume_path, input_chunk_name, upload_log, nproc, create_thumbnail):
def save(tasks, name, volume_path, input_chunk_name, upload_log, create_thumbnail):
"""Save chunk to volume."""
state['operators'][name] = SaveOperator(volume_path,
state['mip'],
upload_log=upload_log,
create_thumbnail=create_thumbnail,
nproc=nproc,
verbose=state['verbose'],
name=name)

Expand Down
86 changes: 40 additions & 46 deletions chunkflow/flow/save.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,56 +21,25 @@ def __init__(self,
mip: int,
upload_log: bool = True,
create_thumbnail: bool = False,
nproc: int = 1,
verbose: bool = True,
name: str = 'save'):
super().__init__(name=name, verbose=verbose)

if nproc==0:
print(yellow('parallel=0 in CloudVolume will create memory overhead!'))

self.upload_log = upload_log
self.create_thumbnail = create_thumbnail
self.mip = mip

self.volume = CloudVolume(
volume_path,
fill_missing=True,
bounded=False,
autocrop=True,
mip=mip,
cache=False,
parallel=nproc,
progress=verbose)
self.verbose = verbose
self.volume_path = volume_path

if upload_log:
log_path = os.path.join(volume_path, 'log')
self.log_storage = Storage(log_path)

if create_thumbnail:
thumbnail_layer_path = os.path.join(volume_path, 'thumbnail')
self.thumbnail_volume = CloudVolume(
os.path.join(volume_path, 'thumbnail'),
compress='gzip',
fill_missing=True,
bounded=False,
autocrop=True,
mip=mip,
cache=False,
parallel=nproc,
progress=verbose)
#self.thumbnail_operator = DownsampleUploadOperator(
# thumbnail_layer_path, chunk_mip = mip,
# start_mip = 6, stop_mip = 7,
# fill_missing = True,
# verbose = verbose
#)

def create_chunk_with_zeros(self, bbox):
def create_chunk_with_zeros(self, bbox, num_channels, dtype):
"""Create a fake all zero chunk.
this is used in skip some operation based on mask."""
shape = (self.volume.num_channels, *bbox.size3())
arr = np.zeros(shape, dtype=self.volume.dtype)
shape = (num_channels, *bbox.size3())
arr = np.zeros(shape, dtype=dtype)
chunk = Chunk(arr, global_offset=(0, *bbox.minpt))
return chunk

Expand All @@ -80,11 +49,23 @@ def __call__(self, chunk, log=None):
print('save chunk.')

start = time.time()
chunk = self._auto_convert_dtype(chunk)

# gevent.monkey.patch_all(thread=False)
volume = CloudVolume(
self.volume_path,
fill_missing=True,
bounded=False,
autocrop=True,
mip=self.mip,
cache=False,
green_threads=True,
progress=self.verbose)

chunk = self._auto_convert_dtype(chunk, volume)

# transpose czyx to xyzc order
arr = np.transpose(chunk.array)
self.volume[chunk.slices[::-1]] = arr
volume[chunk.slices[::-1]] = arr

if self.create_thumbnail:
self._create_thumbnail(chunk)
Expand All @@ -96,21 +77,34 @@ def __call__(self, chunk, log=None):
if self.upload_log:
self._upload_log(log, chunk.bbox)

def _auto_convert_dtype(self, chunk):
def _auto_convert_dtype(self, chunk, volume):
"""convert the data type to fit volume datatype"""
if self.volume.dtype != chunk.dtype:
if volume.dtype != chunk.dtype:
print(yellow(f'converting chunk data type {chunk.dtype} ' +
f'to volume data type: {self.volume.dtype}'))
#float_chunk = chunk.astype(np.float64)
#chunk = float_chunk / np.iinfo(chunk.dtype).max * np.iinfo(self.volume.dtype).max
chunk = chunk / chunk.array.max() * np.iinfo(self.volume.dtype).max
return chunk.astype(self.volume.dtype)
f'to volume data type: {volume.dtype}'))
# float_chunk = chunk.astype(np.float64)
# chunk = float_chunk / np.iinfo(chunk.dtype).max * np.iinfo(self.volume.dtype).max
chunk = chunk / chunk.array.max() * np.iinfo(volume.dtype).max
return chunk.astype(volume.dtype)
else:
return chunk

def _create_thumbnail(self, chunk):
if self.verbose:
print('creating thumbnail...')

thumbnail_layer_path = os.path.join(self.volume_path, 'thumbnail')
thumbnail_volume = CloudVolume(
thumbnail_layer_path,
compress='gzip',
fill_missing=True,
bounded=False,
autocrop=True,
mip=self.mip,
cache=False,
green_threads=True,
progress=self.verbose)

# only use the last channel, it is the Z affinity
# if this is affinitymap
image = chunk[-1, :, :, :]
Expand All @@ -124,7 +118,7 @@ def _create_thumbnail(self, chunk):

downsample_and_upload(image,
image_bbox,
self.thumbnail_volume,
thumbnail_volume,
Vec(*(image.shape)),
mip=self.mip,
max_mip=6,
Expand Down
6 changes: 6 additions & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Using Green Threads
import gevent.monkey
gevent.monkey.patch_all(thread=False)
1 change: 0 additions & 1 deletion tests/flow/test_save.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ def test_save_image():
volume_path, 0,
upload_log = True,
create_thumbnail = False,
nproc = 1,
verbose = True,
name = 'save'
)
Expand Down

0 comments on commit 62ce90b

Please sign in to comment.