diff --git a/chunkflow/flow.py b/chunkflow/flow.py index fbd8f4a3..9fce1697 100755 --- a/chunkflow/flow.py +++ b/chunkflow/flow.py @@ -18,6 +18,20 @@ } +def initialize_task(): + # initialize task with some default values + task = { + 'skip': False, + 'log': {'timer': {}} + } + return task + + +def handle_task_skip(task, name): + if task['skip'] and task['skip_to']==name: + # have already skipped to target operator + task['skip'] = False + def default_none(ctx, param, value): """ click currently can not use None with tuple type @@ -96,9 +110,10 @@ def new_func(stream, *args, **kwargs): @generator def create_chunk_cmd(size, dtype, voxel_offset): """[generator] Create a fake chunk for easy test.""" + task = initialize_task() create_chunk_operator = CreateChunkOperator() - chunk = create_chunk_operator(size=size, dtype=dtype, voxel_offset=voxel_offset) - yield {'chunk': chunk} + task['chunk'] = create_chunk_operator(size=size, dtype=dtype, voxel_offset=voxel_offset) + yield task @cli.command('read-file') @@ -109,23 +124,10 @@ def create_chunk_cmd(size, dtype, voxel_offset): @generator def read_file_cmd(file_name, offset): """[generator] Read HDF5 and tiff files.""" + task = initialize_task() read_file_operator = ReadFileOperator() - chunk = read_file_operator(file_name, global_offset=offset) - yield {'chunk': chunk} - - -@cli.command('write-h5') -@click.option('--name', type=str, default='write-h5', help='name of this operator') -@click.option('--file-name', type=str, required=True, - help='file name of hdf5 file, the extention should be .h5') -@operator -def write_h5_cmd(tasks, name, file_name): - """[operator] Write chunk to HDF5 file.""" - state['operators'][name] = WriteH5Operator() - for task in tasks: - state['operators'][name](task['chunk'], file_name) - # keep the pipeline going - yield task + task['chunk'] = read_file_operator(file_name, global_offset=offset) + yield task @cli.command('generate-task') @@ -137,7 +139,7 @@ def write_h5_cmd(tasks, name, file_name): @generator def generate_task_cmd(queue_name, offset, shape, visibility_timeout): """[generator] Create task or fetch task from queue.""" - task = {'log': {'timer': {}}} + task = initialize_task() if not queue_name: # no queue name specified # will only run one task @@ -159,17 +161,35 @@ def generate_task_cmd(queue_name, offset, shape, visibility_timeout): yield task +@cli.command('write-h5') +@click.option('--name', type=str, default='write-h5', help='name of this operator') +@click.option('--file-name', type=str, required=True, + help='file name of hdf5 file, the extention should be .h5') +@operator +def write_h5_cmd(tasks, name, file_name): + """[operator] Write chunk to HDF5 file.""" + state['operators'][name] = WriteH5Operator() + for task in tasks: + handle_task_skip(task, name) + if not task['skip']: + state['operators'][name](task['chunk'], file_name) + # keep the pipeline going + yield task + + @cli.command('delete-task-in-queue') @click.option('--name', type=str, default='delete-task-in-queue', help='name of this operator') @operator def delete_task_in_queue_cmd(tasks, name): """[operator] Delete the task in queue.""" for task in tasks: - queue = task['queue'] - task_handle = task['task_handle'] - queue.delete(task_handle) - if state['verbose']: - print('deleted task {} in queue: {}'.format(task_handle, queue)) + handle_task_skip(task, name) + if not task['skip']: + queue = task['queue'] + task_handle = task['task_handle'] + queue.delete(task_handle) + if state['verbose']: + print('deleted task {} in queue: {}'.format(task_handle, queue)) @cli.command('cutout') @@ -199,10 +219,12 @@ def cutout_cmd(tasks, name, volume_path, mip, expand_margin_size, name=name) for task in tasks: - start = time() - task['chunk'] = state['operators'][name](task['output_bbox']) - task['log']['timer'][name] = time() - start - task['cutout_volume_path'] = volume_path + handle_task_skip(task, name) + if not task['skip']: + start = time() + task['chunk'] = state['operators'][name](task['output_bbox']) + task['log']['timer'][name] = time() - start + task['cutout_volume_path'] = volume_path yield task @@ -231,9 +253,11 @@ def normalize_contrast_contrast_cmd(tasks, name, levels_path, mip, clip_fraction levels_path, mip, clip_fraction, minval=minval, maxval=maxval, name=name) for task in tasks: - start = time() - task['chunk'] = state['operators'][name](task['chunk']) - task['log']['timer'][name] = time() - start + handle_task_skip(task, name) + if not task['skip']: + start = time() + task['chunk'] = state['operators'][name](task['chunk']) + task['log']['timer'][name] = time() - start yield task @@ -268,12 +292,14 @@ def inference_cmd(tasks, name, convnet_model, convnet_weight_path, patch_size, verbose=state['verbose'], name=name) for task in tasks: - if 'log' not in task: - task['log'] = {'timer': {}} - start = time() - task['chunk'] = state['operators'][name](task['chunk']) - task['log']['timer'][name] = time() - start - task['log']['compute_device'] = state['operators'][name].compute_device + handle_task_skip(task, name) + if not task['skip']: + if 'log' not in task: + task['log'] = {'timer': {}} + start = time() + task['chunk'] = state['operators'][name](task['chunk']) + task['log']['timer'][name] = time() - start + task['log']['compute_device'] = state['operators'][name].compute_device yield task @@ -287,23 +313,39 @@ def inference_cmd(tasks, name, convnet_model, convnet_weight_path, patch_size, @click.option('--fill-missing/--no-fill-missing', default=False, help='fill missing blocks with black or not. ' + 'default is False.') +@click.option('--check-all-zero/--maskout', default=False, + help='default is doing maskout. '+ + 'check all zero will return boolean result.') +@click.option('--skip-to', type=str, default='save', + help='skip to a operator') @operator -def mask_cmd(tasks, name, volume_path, mip, inverse, fill_missing): +def mask_cmd(tasks, name, volume_path, mip, inverse, fill_missing, + check_all_zero, skip_to): """[operator] Mask the chunk. The mask could be in higher mip level and we will automatically upsample it to the same mip level with chunk. """ - state['operators'][name] = MaskOperator(volume_path, mip, state['mip'], - inverse=inverse, - fill_missing=fill_missing, - verbose=state['verbose'], - name=name) + state['operators'][name] = MaskOperator(volume_path, mip, state['mip'], + inverse=inverse, + fill_missing=fill_missing, + check_all_zero=check_all_zero, + verbose=state['verbose'], + name=name) for task in tasks: - start = time() - task['chunk'] = state['operators'][name](task['chunk']) - # Note that mask operation could be used several times, - # this will only record the last masking operation - task['log']['timer'][name] = time() - start + handle_task_skip(task, name) + if not task['skip']: + start = time() + if check_all_zero: + # skip following operators since the mask is all zero after required inverse + task['skip'] = state['operators'][name].is_all_zero(task['output_bbox']) + if task['skip']: + print('the mask of {} is all zero, will skip to {}'.format(name, skip_to)) + task['skip_to'] = skip_to + else: + task['chunk'] = state['operators'][name](task['chunk']) + # Note that mask operation could be used several times, + # this will only record the last masking operation + task['log']['timer'][name] = time() - start yield task @@ -319,11 +361,14 @@ def crop_margin_cmd(tasks, name, margin_size): state['operators'][name] = CropMarginOperator(margin_size=margin_size, verbose=state['verbose'], name=name) + for task in tasks: - start = time() - task['chunk'] = state['operators'][name](task['chunk'], - output_bbox=task['output_bbox']) - task['log']['timer'][name] = time() - start + handle_task_skip(task, name) + if not task['skip']: + start = time() + task['chunk'] = state['operators'][name](task['chunk'], + output_bbox=task['output_bbox']) + task['log']['timer'][name] = time() - start yield task @@ -348,11 +393,18 @@ def save_cmd(tasks, name, volume_path, upload_log, nproc, create_thumbnail): verbose=state['verbose'], name=name) for task in tasks: - # the time elapsed was recorded internally - state['operators'][name]( - task['chunk'], log=task.get('log', {'timer':{}}), - output_bbox=task.get('output_bbox', None)) - task['output_volume_path'] = volume_path + # we got a special case for handling skip + if task['skip'] and task['skip_to']==name: + task['skip'] = False + # create fake chunk to save + task['chunk'] = state['operators'][name].create_chunk_with_zeros(task['output_bbox']) + + if not task['skip']: + # the time elapsed was recorded internally + state['operators'][name]( + task['chunk'], log=task.get('log', {'timer':{}}), + output_bbox=task.get('output_bbox', None)) + task['output_volume_path'] = volume_path yield task @@ -365,7 +417,9 @@ def cloud_watch_cmd(tasks, name, log_name): state['operators'][name]=CloudWatchOperator(log_name=log_name, name=name, verbose=state['verbose']) for task in tasks: - state['operators'][name](task['log']) + handle_task_skip(task, name) + if not task['skip']: + state['operators'][name](task['log']) yield task @@ -376,7 +430,9 @@ def view_cmd(tasks, name): """[operator] Visualize the chunk using cloudvolume view in browser.""" state['operators'][name] = ViewOperator(name=name) for task in tasks: - state['operators'][name](task['chunk']) + handle_task_skip(task, name) + if not task['skip']: + state['operators'][name](task['chunk']) yield task @@ -389,7 +445,9 @@ def neuroglancer_cmd(tasks, name, voxel_size): """[operator] Visualize the chunk using neuroglancer.""" state['operators'][name] = NeuroglancerViewOperator(name=name) for task in tasks: - state['operators'][name]([task['chunk'],], voxel_size=voxel_size) + handle_task_skip(task, name) + if not task['skip']: + state['operators'][name]([task['chunk'],], voxel_size=voxel_size) yield task diff --git a/chunkflow/operators/mask.py b/chunkflow/operators/mask.py index 6574fb47..4843f95d 100644 --- a/chunkflow/operators/mask.py +++ b/chunkflow/operators/mask.py @@ -4,12 +4,14 @@ from cloudvolume import CloudVolume from cloudvolume.lib import Bbox +from chunkflow.chunk import Chunk from .operator_base import OperatorBase class MaskOperator(OperatorBase): def __init__(self, volume_path: str, mask_mip: int, chunk_mip: int, - inverse: bool=False, fill_missing: bool=False, + inverse: bool=False, fill_missing: bool=False, + check_all_zero=False, verbose: bool=True, name: str='mask'): super().__init__(name=name) @@ -18,6 +20,7 @@ def __init__(self, volume_path: str, mask_mip: int, chunk_mip: int, self.inverse = inverse self.verbose = verbose self.volume_path = volume_path + self.check_all_zero = check_all_zero self.mask_vol = CloudVolume( volume_path, @@ -29,8 +32,24 @@ def __init__(self, volume_path: str, mask_mip: int, chunk_mip: int, if verbose: print("mask chunk at mip {} using {}".format( mask_mip, volume_path)) + + def __call__(self, x): + if self.check_all_zero: + assert isinstance(x, Bbox) + return self.is_all_zero(x) + else: + assert isinstance(x, Chunk) + return self.maskout(x) + + def is_all_zero(self, bbox): + mask_in_high_mip = self._read_mask(bbox) + if np.alltrue(mask_in_high_mip == 0): + # mask is all zero + return True + else: + return False - def __call__(self, chunk): + def maskout(self, chunk): if self.verbose: print('mask out chunk using {} in mip {}'.format( self.volume_path, self.mask_mip)) @@ -41,12 +60,6 @@ def __call__(self, chunk): chunk_bbox = Bbox.from_slices(chunk.slices[-3:]) mask_in_high_mip = self._read_mask(chunk_bbox) - # this is a cloudvolume VolumeCutout rather than a normal numpy array - # which will make np.alltrue(mask_in_high_mip == 0) to be - # VolumeCutout(False) rather than False - mask_in_high_mip = np.asarray(mask_in_high_mip) - if self.inverse: - mask_in_high_mip = (mask_in_high_mip==0) if np.alltrue(mask_in_high_mip == 0): warn('the mask is all black, mask all the voxels directly') @@ -100,6 +113,12 @@ def _read_mask(self, chunk_bbox): mask = self.mask_vol[mask_slices[::-1]] mask = np.transpose(mask) mask = np.squeeze(mask, axis=0) - + + # this is a cloudvolume VolumeCutout rather than a normal numpy array + # which will make np.alltrue(mask_in_high_mip == 0) to be + # VolumeCutout(False) rather than False + mask = np.asarray(mask) + if self.inverse: + mask = (mask==0) return mask diff --git a/chunkflow/operators/save.py b/chunkflow/operators/save.py index 86aaa5eb..803c3b6d 100644 --- a/chunkflow/operators/save.py +++ b/chunkflow/operators/save.py @@ -50,7 +50,13 @@ def __init__(self, volume_path: str, mip: int, mip=mip, progress=verbose) - + def create_chunk_with_zeros(self, bbox): + """Create a fake all zero chunk""" + shape = (self.volume.num_channels, *bbox.size3[::-1]) + arr = np.zeros(shape, dtype=self.volume.dtype) + chunk = Chunk(arr, global_offset=bbox.minpt) + return chunk + def __call__(self, chunk, log={'timer':{}}, output_bbox=None): start = time.time()