Skip to content

Commit

Permalink
Merge 6aa2943 into cfe35da
Browse files Browse the repository at this point in the history
  • Loading branch information
jingpengw committed Mar 9, 2019
2 parents cfe35da + 6aa2943 commit bffab4f
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 70 deletions.
178 changes: 118 additions & 60 deletions chunkflow/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,20 @@
}


def initialize_task():
# initialize task with some default values
task = {
'skip': False,
'log': {'timer': {}}
}
return task


def handle_task_skip(task, name):
if task['skip'] and task['skip_to']==name:
# have already skipped to target operator
task['skip'] = False

def default_none(ctx, param, value):
"""
click currently can not use None with tuple type
Expand Down Expand Up @@ -96,9 +110,10 @@ def new_func(stream, *args, **kwargs):
@generator
def create_chunk_cmd(size, dtype, voxel_offset):
"""[generator] Create a fake chunk for easy test."""
task = initialize_task()
create_chunk_operator = CreateChunkOperator()
chunk = create_chunk_operator(size=size, dtype=dtype, voxel_offset=voxel_offset)
yield {'chunk': chunk}
task['chunk'] = create_chunk_operator(size=size, dtype=dtype, voxel_offset=voxel_offset)
yield task


@cli.command('read-file')
Expand All @@ -109,23 +124,10 @@ def create_chunk_cmd(size, dtype, voxel_offset):
@generator
def read_file_cmd(file_name, offset):
"""[generator] Read HDF5 and tiff files."""
task = initialize_task()
read_file_operator = ReadFileOperator()
chunk = read_file_operator(file_name, global_offset=offset)
yield {'chunk': chunk}


@cli.command('write-h5')
@click.option('--name', type=str, default='write-h5', help='name of this operator')
@click.option('--file-name', type=str, required=True,
help='file name of hdf5 file, the extention should be .h5')
@operator
def write_h5_cmd(tasks, name, file_name):
"""[operator] Write chunk to HDF5 file."""
state['operators'][name] = WriteH5Operator()
for task in tasks:
state['operators'][name](task['chunk'], file_name)
# keep the pipeline going
yield task
task['chunk'] = read_file_operator(file_name, global_offset=offset)
yield task


@cli.command('generate-task')
Expand All @@ -137,7 +139,7 @@ def write_h5_cmd(tasks, name, file_name):
@generator
def generate_task_cmd(queue_name, offset, shape, visibility_timeout):
"""[generator] Create task or fetch task from queue."""
task = {'log': {'timer': {}}}
task = initialize_task()
if not queue_name:
# no queue name specified
# will only run one task
Expand All @@ -159,17 +161,35 @@ def generate_task_cmd(queue_name, offset, shape, visibility_timeout):
yield task


@cli.command('write-h5')
@click.option('--name', type=str, default='write-h5', help='name of this operator')
@click.option('--file-name', type=str, required=True,
help='file name of hdf5 file, the extention should be .h5')
@operator
def write_h5_cmd(tasks, name, file_name):
"""[operator] Write chunk to HDF5 file."""
state['operators'][name] = WriteH5Operator()
for task in tasks:
handle_task_skip(task, name)
if not task['skip']:
state['operators'][name](task['chunk'], file_name)
# keep the pipeline going
yield task


@cli.command('delete-task-in-queue')
@click.option('--name', type=str, default='delete-task-in-queue', help='name of this operator')
@operator
def delete_task_in_queue_cmd(tasks, name):
"""[operator] Delete the task in queue."""
for task in tasks:
queue = task['queue']
task_handle = task['task_handle']
queue.delete(task_handle)
if state['verbose']:
print('deleted task {} in queue: {}'.format(task_handle, queue))
handle_task_skip(task, name)
if not task['skip']:
queue = task['queue']
task_handle = task['task_handle']
queue.delete(task_handle)
if state['verbose']:
print('deleted task {} in queue: {}'.format(task_handle, queue))


@cli.command('cutout')
Expand Down Expand Up @@ -199,10 +219,12 @@ def cutout_cmd(tasks, name, volume_path, mip, expand_margin_size,
name=name)

for task in tasks:
start = time()
task['chunk'] = state['operators'][name](task['output_bbox'])
task['log']['timer'][name] = time() - start
task['cutout_volume_path'] = volume_path
handle_task_skip(task, name)
if not task['skip']:
start = time()
task['chunk'] = state['operators'][name](task['output_bbox'])
task['log']['timer'][name] = time() - start
task['cutout_volume_path'] = volume_path
yield task


Expand Down Expand Up @@ -231,9 +253,11 @@ def normalize_contrast_contrast_cmd(tasks, name, levels_path, mip, clip_fraction
levels_path, mip, clip_fraction, minval=minval, maxval=maxval, name=name)

for task in tasks:
start = time()
task['chunk'] = state['operators'][name](task['chunk'])
task['log']['timer'][name] = time() - start
handle_task_skip(task, name)
if not task['skip']:
start = time()
task['chunk'] = state['operators'][name](task['chunk'])
task['log']['timer'][name] = time() - start
yield task


Expand Down Expand Up @@ -268,12 +292,14 @@ def inference_cmd(tasks, name, convnet_model, convnet_weight_path, patch_size,
verbose=state['verbose'], name=name)

for task in tasks:
if 'log' not in task:
task['log'] = {'timer': {}}
start = time()
task['chunk'] = state['operators'][name](task['chunk'])
task['log']['timer'][name] = time() - start
task['log']['compute_device'] = state['operators'][name].compute_device
handle_task_skip(task, name)
if not task['skip']:
if 'log' not in task:
task['log'] = {'timer': {}}
start = time()
task['chunk'] = state['operators'][name](task['chunk'])
task['log']['timer'][name] = time() - start
task['log']['compute_device'] = state['operators'][name].compute_device
yield task


Expand All @@ -287,23 +313,39 @@ def inference_cmd(tasks, name, convnet_model, convnet_weight_path, patch_size,
@click.option('--fill-missing/--no-fill-missing', default=False,
help='fill missing blocks with black or not. ' +
'default is False.')
@click.option('--check-all-zero/--maskout', default=False,
help='default is doing maskout. '+
'check all zero will return boolean result.')
@click.option('--skip-to', type=str, default='save',
help='skip to a operator')
@operator
def mask_cmd(tasks, name, volume_path, mip, inverse, fill_missing):
def mask_cmd(tasks, name, volume_path, mip, inverse, fill_missing,
check_all_zero, skip_to):
"""[operator] Mask the chunk. The mask could be in higher mip level and we
will automatically upsample it to the same mip level with chunk.
"""
state['operators'][name] = MaskOperator(volume_path, mip, state['mip'],
inverse=inverse,
fill_missing=fill_missing,
verbose=state['verbose'],
name=name)
state['operators'][name] = MaskOperator(volume_path, mip, state['mip'],
inverse=inverse,
fill_missing=fill_missing,
check_all_zero=check_all_zero,
verbose=state['verbose'],
name=name)

for task in tasks:
start = time()
task['chunk'] = state['operators'][name](task['chunk'])
# Note that mask operation could be used several times,
# this will only record the last masking operation
task['log']['timer'][name] = time() - start
handle_task_skip(task, name)
if not task['skip']:
start = time()
if check_all_zero:
# skip following operators since the mask is all zero after required inverse
task['skip'] = state['operators'][name].is_all_zero(task['output_bbox'])
if task['skip']:
print('the mask of {} is all zero, will skip to {}'.format(name, skip_to))
task['skip_to'] = skip_to
else:
task['chunk'] = state['operators'][name](task['chunk'])
# Note that mask operation could be used several times,
# this will only record the last masking operation
task['log']['timer'][name] = time() - start
yield task


Expand All @@ -319,11 +361,14 @@ def crop_margin_cmd(tasks, name, margin_size):
state['operators'][name] = CropMarginOperator(margin_size=margin_size,
verbose=state['verbose'],
name=name)

for task in tasks:
start = time()
task['chunk'] = state['operators'][name](task['chunk'],
output_bbox=task['output_bbox'])
task['log']['timer'][name] = time() - start
handle_task_skip(task, name)
if not task['skip']:
start = time()
task['chunk'] = state['operators'][name](task['chunk'],
output_bbox=task['output_bbox'])
task['log']['timer'][name] = time() - start
yield task


Expand All @@ -348,11 +393,18 @@ def save_cmd(tasks, name, volume_path, upload_log, nproc, create_thumbnail):
verbose=state['verbose'], name=name)

for task in tasks:
# the time elapsed was recorded internally
state['operators'][name](
task['chunk'], log=task.get('log', {'timer':{}}),
output_bbox=task.get('output_bbox', None))
task['output_volume_path'] = volume_path
# we got a special case for handling skip
if task['skip'] and task['skip_to']==name:
task['skip'] = False
# create fake chunk to save
task['chunk'] = state['operators'][name].create_chunk_with_zeros(task['output_bbox'])

if not task['skip']:
# the time elapsed was recorded internally
state['operators'][name](
task['chunk'], log=task.get('log', {'timer':{}}),
output_bbox=task.get('output_bbox', None))
task['output_volume_path'] = volume_path
yield task


Expand All @@ -365,7 +417,9 @@ def cloud_watch_cmd(tasks, name, log_name):
state['operators'][name]=CloudWatchOperator(log_name=log_name, name=name,
verbose=state['verbose'])
for task in tasks:
state['operators'][name](task['log'])
handle_task_skip(task, name)
if not task['skip']:
state['operators'][name](task['log'])
yield task


Expand All @@ -376,7 +430,9 @@ def view_cmd(tasks, name):
"""[operator] Visualize the chunk using cloudvolume view in browser."""
state['operators'][name] = ViewOperator(name=name)
for task in tasks:
state['operators'][name](task['chunk'])
handle_task_skip(task, name)
if not task['skip']:
state['operators'][name](task['chunk'])
yield task


Expand All @@ -389,7 +445,9 @@ def neuroglancer_cmd(tasks, name, voxel_size):
"""[operator] Visualize the chunk using neuroglancer."""
state['operators'][name] = NeuroglancerViewOperator(name=name)
for task in tasks:
state['operators'][name]([task['chunk'],], voxel_size=voxel_size)
handle_task_skip(task, name)
if not task['skip']:
state['operators'][name]([task['chunk'],], voxel_size=voxel_size)
yield task


Expand Down
37 changes: 28 additions & 9 deletions chunkflow/operators/mask.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
from cloudvolume import CloudVolume
from cloudvolume.lib import Bbox

from chunkflow.chunk import Chunk
from .operator_base import OperatorBase


class MaskOperator(OperatorBase):
def __init__(self, volume_path: str, mask_mip: int, chunk_mip: int,
inverse: bool=False, fill_missing: bool=False,
inverse: bool=False, fill_missing: bool=False,
check_all_zero=False,
verbose: bool=True, name: str='mask'):
super().__init__(name=name)

Expand All @@ -18,6 +20,7 @@ def __init__(self, volume_path: str, mask_mip: int, chunk_mip: int,
self.inverse = inverse
self.verbose = verbose
self.volume_path = volume_path
self.check_all_zero = check_all_zero

self.mask_vol = CloudVolume(
volume_path,
Expand All @@ -29,8 +32,24 @@ def __init__(self, volume_path: str, mask_mip: int, chunk_mip: int,
if verbose:
print("mask chunk at mip {} using {}".format(
mask_mip, volume_path))

def __call__(self, x):
if self.check_all_zero:
assert isinstance(x, Bbox)
return self.is_all_zero(x)
else:
assert isinstance(x, Chunk)
return self.maskout(x)

def is_all_zero(self, bbox):
mask_in_high_mip = self._read_mask(bbox)
if np.alltrue(mask_in_high_mip == 0):
# mask is all zero
return True
else:
return False

def __call__(self, chunk):
def maskout(self, chunk):
if self.verbose:
print('mask out chunk using {} in mip {}'.format(
self.volume_path, self.mask_mip))
Expand All @@ -41,12 +60,6 @@ def __call__(self, chunk):

chunk_bbox = Bbox.from_slices(chunk.slices[-3:])
mask_in_high_mip = self._read_mask(chunk_bbox)
# this is a cloudvolume VolumeCutout rather than a normal numpy array
# which will make np.alltrue(mask_in_high_mip == 0) to be
# VolumeCutout(False) rather than False
mask_in_high_mip = np.asarray(mask_in_high_mip)
if self.inverse:
mask_in_high_mip = (mask_in_high_mip==0)

if np.alltrue(mask_in_high_mip == 0):
warn('the mask is all black, mask all the voxels directly')
Expand Down Expand Up @@ -100,6 +113,12 @@ def _read_mask(self, chunk_bbox):
mask = self.mask_vol[mask_slices[::-1]]
mask = np.transpose(mask)
mask = np.squeeze(mask, axis=0)


# this is a cloudvolume VolumeCutout rather than a normal numpy array
# which will make np.alltrue(mask_in_high_mip == 0) to be
# VolumeCutout(False) rather than False
mask = np.asarray(mask)
if self.inverse:
mask = (mask==0)
return mask

8 changes: 7 additions & 1 deletion chunkflow/operators/save.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,13 @@ def __init__(self, volume_path: str, mip: int,
mip=mip,
progress=verbose)


def create_chunk_with_zeros(self, bbox):
"""Create a fake all zero chunk"""
shape = (self.volume.num_channels, *bbox.size3[::-1])
arr = np.zeros(shape, dtype=self.volume.dtype)
chunk = Chunk(arr, global_offset=bbox.minpt)
return chunk

def __call__(self, chunk, log={'timer':{}}, output_bbox=None):
start = time.time()

Expand Down

0 comments on commit bffab4f

Please sign in to comment.