Skip to content

Commit

Permalink
Merge 59f2075 into dc91fd8
Browse files Browse the repository at this point in the history
  • Loading branch information
jingpengw committed Apr 27, 2020
2 parents dc91fd8 + 59f2075 commit 2d46511
Show file tree
Hide file tree
Showing 9 changed files with 337 additions and 197 deletions.
19 changes: 1 addition & 18 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,8 @@ install:
- pip install pytest-cov

script:
- coverage run -a --source=./chunkflow chunkflow generate-tasks -c 0 0 0 -s 0 0 0 -g 1 1 1
- coverage run -a --source=./chunkflow chunkflow create-chunk -o seg create-chunk -o gt evaluate-segmentation -s seg -g gt
- coverage run -a --source=./chunkflow chunkflow log-summary -l tests/data/log
- coverage run -a --source=./chunkflow chunkflow create-chunk write-h5 --file-name=/tmp/img.h5 connected-components --threshold 128 write-tif --file-name=/tmp/seg.h5
- if test -f /tmp/img.h5 ; then echo "File found"; else exit 1; fi
- coverage run -a --source=./chunkflow chunkflow read-h5 --file-name=/tmp/img.h5
- coverage run -a --source=./chunkflow chunkflow --dry-run --verbose 1 setup-env -l "gs://my/path" --volume-start 2002 25616 12304 --volume-stop 2068 26128 12816 --max-ram-size 14 --input-patch-size 20 128 128 --output-patch-size 16 96 96 --output-patch-overlap 6 32 32 --channel-num 3 --dtype float32 -m 0 --encoding raw --voxel-size 45 16 16 --max-mip 5
- coverage run -a --source=./chunkflow chunkflow create-chunk --size 36 448 448 inference --input-patch-size 20 256 256 --patch-num 2 2 2 --framework identity --batch-size 3 cloud-watch --log-name chunkflow-test
- coverage run -a --source=./chunkflow chunkflow create-chunk --all-zero --size 36 448 448 inference --input-patch-size 20 256 256 --patch-num 2 2 2 --framework identity --batch-size 3 cloud-watch --log-name chunkflow-test
- coverage run -a --source=./chunkflow chunkflow create-chunk --size 36 448 448 --dtype "uint32" connected-components mask-out-objects -d 50 -s "2,3,4" skeletonize --voxel-size 1 1 1 --output-path file:///tmp/test/skeleton mesh -t ply -v 1 1 1
- coverage run -a --source=./chunkflow chunkflow create-chunk --size 36 448 448 inference
--input-patch-size 20 256 256 --patch-num 2 2 2 --framework "general" --convnet-model "chunkflow/chunk/image/convnet/patch/general_identity.py" --batch-size 3 cloud-watch --log-name chunkflow-test

# the coverage run commands are not working correctly in travis. they works locally though.
# I have created an topic to discuss about this issue.
# https://travis-ci.community/t/python-coverage-run-get-lower-coverage-rate-in-travis-than-local/7692/2
- coverage report

# unit tests
- bash tests/command_line.sh
- pytest --cov-append --cov=./chunkflow ./tests --verbose

after_success:
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ After installation, You can simply type `chunkflow` and it will list all the ope
| Operator Name | Function |
| --------------- | -------- |
| agglomerate | Watershed and agglomeration to segment affinity map |
| aggregate-skeleton-fragments| Merge skeleton fragments from chunks |
| channel-voting | Vote across channels of semantic map |
| cloud-watch | Realtime speedometer in AWS CloudWatch |
| connected-components | Threshold the boundary map to get a segmentation |
Expand Down
67 changes: 67 additions & 0 deletions chunkflow/flow/aggregate_skeleton_fragments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import os
import re
from collections import defaultdict
from time import sleep

from cloudvolume.storage import Storage
from cloudvolume import PrecomputedSkeleton

import kimimaro

from .base import OperatorBase


class AggregateSkeletonFragmentsOperator(OperatorBase):
"""Merge skeleton fragments for Neuroglancer visualization."""
def __init__(self,
fragments_path: str,
output_path: str,
name: str = 'aggregate-skeleton-fragments',
verbose: bool = True):
"""
Parameters
------------
fragments_path:
path to store fragment files
output_path:
save the merged skeleton file here.
"""
super().__init__(name=name, verbose=verbose)
self.fragments_storage = Storage(fragments_path)
self.output_storage = Storage(output_path)

def __call__(self, prefix: str):
if self.verbose:
print('aggregate skeletons with prefix of ', prefix)

id2filenames = defaultdict(list)
for filename in self.fragments_storage.list_files(prefix=prefix):
filename = os.path.basename(filename)
# `match` implies the beginning (^). `search` matches whole string
matches = re.search(r'(\d+):', filename)

if not matches:
continue

# skeleton ID
skl_id = int(matches.group(0)[:-1])
id2filenames[skl_id].append(filename)

for skl_id, filenames in id2filenames.items():
if self.verbose:
print('skeleton id: ', skl_id)
# print('filenames: ', filenames)
frags = self.fragments_storage.get_files(filenames)
frags = [PrecomputedSkeleton.from_precomputed(x['content']) for x in frags]
skel = PrecomputedSkeleton.simple_merge(frags).consolidate()
skel = kimimaro.postprocess(
skel,
dust_threshold=1000,
tick_threshold=3500
)
self.output_storage.put_file(
file_path=str(skl_id),
content=skel.to_precomputed(),
)
# the last few hundred files will not be uploaded without sleeping!
sleep(0.01)
222 changes: 46 additions & 176 deletions chunkflow/flow/flow.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
#!/usr/bin/env python
import os
import sys
from functools import update_wrapper, wraps
from time import time

Expand All @@ -9,8 +7,6 @@

from cloudvolume.lib import Bbox, Vec, yellow
# from cloudvolume.datasource.precomputed.metadata import PrecomputedMetadata
from cloudvolume import CloudVolume
from cloudvolume.storage import SimpleStorage

from chunkflow.lib.aws.sqs_queue import SQSQueue
from chunkflow.chunk import Chunk
Expand All @@ -20,6 +16,7 @@

# import operator functions
from .agglomerate import AgglomerateOperator
from .aggregate_skeleton_fragments import AggregateSkeletonFragmentsOperator
from .cloud_watch import CloudWatchOperator
from .create_bounding_boxes import create_bounding_boxes
from .custom_operator import CustomOperator
Expand All @@ -35,6 +32,7 @@
from .normalize_section_shang import NormalizeSectionShangOperator
from .save import SaveOperator
from .save_pngs import SavePNGsOperator
from .setup_env import setup_environment
from .skeletonize import SkeletonizeOperator
from .view import ViewOperator

Expand Down Expand Up @@ -217,176 +215,19 @@ def generate_tasks(layer_path, mip, roi_start, chunk_size,
@click.option('--overwrite-info/--no-overwrite-info', default=False,
help='normally we should avoid overwriting info file to avoid errors.')
@generator
def setup_env(volume_start, volume_stop, volume_size, layer_path, max_ram_size,
output_patch_size, input_patch_size, channel_num, dtype,
def setup_env(volume_start, volume_stop, volume_size, layer_path,
max_ram_size, output_patch_size, input_patch_size, channel_num, dtype,
output_patch_overlap, crop_chunk_margin, mip, thumbnail_mip, max_mip,
queue_name, visibility_timeout, thumbnail, encoding, voxel_size, overwrite_info):
"""Prepare storage info files and produce tasks."""
assert not (volume_stop is None and volume_size is None)
if isinstance(volume_start, tuple):
volume_start = Vec(*volume_start)
if isinstance(volume_stop, tuple):
volume_stop = Vec(*volume_stop)
if isinstance(volume_size, tuple):
volume_size = Vec(*volume_size)

if input_patch_size is None:
input_patch_size = output_patch_size

if volume_size:
assert volume_stop is None
volume_stop = volume_start + volume_size
else:
volume_size = volume_stop - volume_start
print('\noutput volume start: ', volume_start)
print('output volume stop: ', volume_stop)
print('output volume size: ', volume_size)

if output_patch_overlap is None:
# use 50% patch overlap in default
output_patch_overlap = tuple(s//2 for s in output_patch_size)
assert output_patch_overlap[1] == output_patch_overlap[2]

if crop_chunk_margin is None:
crop_chunk_margin = output_patch_overlap
assert crop_chunk_margin[1] == crop_chunk_margin[2]
print('output crop chunk margin: ', crop_chunk_margin)

if thumbnail:
# thumnail requires maximum mip level of 5
thumbnail_mip = max(thumbnail_mip, 5)

patch_stride = tuple(s - o for s, o in zip(
output_patch_size, output_patch_overlap))
# total number of voxels per patch in one stride
patch_voxel_num = np.product( patch_stride )
# use half of the maximum ram size to store output buffer
ideal_total_patch_num = int(max_ram_size*1e9/2/4/channel_num/patch_voxel_num)
# the xy size should be the same
assert output_patch_size[1] == output_patch_size[2]
# compute the output chunk/block size in cloud storage
# assume that the crop margin size is the same with the patch overlap
patch_num_start = int(ideal_total_patch_num ** (1./3.) / 2)
patch_num_stop = patch_num_start * 3

# find the patch number solution with minimum cost by bruteforce search
cost = sys.float_info.max
patch_num = None
# patch number in x and y
max_factor = 2**max_mip
factor = 2**mip
for pnxy in range(patch_num_start, patch_num_stop):
if (pnxy * patch_stride[2] + output_patch_overlap[2] -
2 * crop_chunk_margin[2]) % max_factor != 0:
continue
# patch number in z
for pnz in range(patch_num_start, patch_num_stop):
if (pnz * patch_stride[0] + output_patch_overlap[0] -
2 * crop_chunk_margin[0]) % factor != 0:
continue
current_cost = ( pnxy * pnxy * pnz / ideal_total_patch_num - 1) ** 2 #+ (pnxy / pnz - 1) ** 2
if current_cost < cost:
cost = current_cost
patch_num = (pnz, pnxy, pnxy)

print('\ninput patch size: ', input_patch_size)
print('output patch size: ', output_patch_size)
print('output patch overlap: ', output_patch_overlap)
print('output patch stride: ', patch_stride)
print('patch number: ', patch_num)

assert mip>=0
block_mip = (mip + thumbnail_mip) // 2
block_factor = 2 ** block_mip

output_chunk_size = tuple(n*s+o-2*c for n, s, o, c in zip(
patch_num, patch_stride, output_patch_overlap, crop_chunk_margin))

input_chunk_size = tuple(ocs + ccm*2 + ips - ops for ocs, ccm, ips, ops in zip(
output_chunk_size, crop_chunk_margin,
input_patch_size, output_patch_size))

expand_margin_size = tuple((ics - ocs) // 2 for ics, ocs in zip(
input_chunk_size, output_chunk_size))

input_chunk_start = tuple(vs - ccm - (ips-ops)//2 for vs, ccm, ips, ops in zip(
volume_start, crop_chunk_margin, input_patch_size, output_patch_size))

block_size = (output_chunk_size[0]//factor,
output_chunk_size[1]//block_factor,
output_chunk_size[2]//block_factor)

print('\ninput chunk size: ', input_chunk_size)
print('input volume start: ', input_chunk_start)
print('output chunk size: ', output_chunk_size)
print('cutout expand margin size: ', expand_margin_size)

print('output volume start: ', volume_start)
print('block size: ', block_size)
print('RAM size of each block: ',
np.prod(output_chunk_size)/1024/1024/1024*4*channel_num, ' GB')
print('voxel utilization: ',
np.prod(output_chunk_size)/np.prod(patch_num)/np.prod(output_patch_size))

if not state['dry_run']:
storage = SimpleStorage(layer_path)
thumbnail_layer_path = os.path.join(layer_path, 'thumbnail')
thumbnail_storage = SimpleStorage(thumbnail_layer_path)

if not overwrite_info:
print('\ncheck that we are not overwriting existing info file.')
assert storage.exists('info')
assert thumbnail_storage.exists('info')

print('create and upload info file to ', layer_path)
# Note that cloudvolume use fortran order rather than C order
info = CloudVolume.create_new_info(channel_num, layer_type='image',
data_type=dtype,
encoding=encoding,
resolution=voxel_size[::-1],
voxel_offset=volume_start[::-1],
volume_size=volume_size[::-1],
chunk_size=block_size[::-1],
max_mip=mip)
vol = CloudVolume(layer_path, info=info)
if overwrite_info:
vol.commit_info()

thumbnail_factor = 2**thumbnail_mip
thumbnail_block_size = (output_chunk_size[0]//factor,
output_chunk_size[1]//thumbnail_factor,
output_chunk_size[2]//thumbnail_factor)
print('thumbnail block size: ', thumbnail_block_size)
thumbnail_info = CloudVolume.create_new_info(1, layer_type='image',
data_type='uint8',
encoding='raw',
resolution=voxel_size[::-1],
voxel_offset=volume_start[::-1],
volume_size=volume_size[::-1],
chunk_size=thumbnail_block_size[::-1],
max_mip=thumbnail_mip)
thumbnail_vol = CloudVolume(thumbnail_layer_path, info=thumbnail_info)
if overwrite_info:
thumbnail_vol.commit_info()

print('create a list of bounding boxes...')
roi_start = (volume_start[0],
volume_start[1]//factor,
volume_start[2]//factor)
roi_size = (volume_size[0],
volume_size[1]//factor,
volume_size[2]//factor)
roi_stop = tuple(s+z for s, z in zip(roi_start, roi_size))

# create bounding boxes and ingest to queue
bboxes = create_bounding_boxes(output_chunk_size,
roi_start=roi_start, roi_stop=roi_stop,
verbose=state['verbose'])
print('total number of tasks: ', len(bboxes))

if state['verbose'] > 1:
print('bounding boxes: ', bboxes)

queue_name, visibility_timeout, thumbnail, encoding, voxel_size,
overwrite_info):

bboxes = setup_environment(
state['dry_run'], volume_start, volume_stop, volume_size, layer_path,
max_ram_size, output_patch_size, input_patch_size, channel_num, dtype,
output_patch_overlap, crop_chunk_margin, mip, thumbnail_mip, max_mip,
queue_name, visibility_timeout, thumbnail, encoding, voxel_size,
overwrite_info, state['verbose'])

if queue_name is not None and not state['dry_run']:
queue = SQSQueue(queue_name, visibility_timeout=visibility_timeout)
queue.send_message_list(bboxes)
Expand Down Expand Up @@ -485,6 +326,35 @@ def agglomerate(tasks, name, threshold, aff_threshold_low, aff_threshold_high,
yield task


@main.command('aggregate-skeleton-fragments')
@click.option('--name', type=str, default='aggregate-skeleton-fragments',
help='name of operator')
@click.option('--input-name', '-i', type=str, default='prefix',
help='input prefix name in task stream.')
@click.option('--prefix', '-p', type=str, default=None,
help='prefix of skeleton fragments.')
@click.option('--fragments-path', '-f', type=str, required=True,
help='storage path of skeleton fragments.')
@click.option('--output-path', '-o', type=str, default=None,
help='storage path of aggregated skeletons.')
@operator
def aggregate_skeleton_fragments(tasks, name, input_name, prefix, fragments_path, output_path):
"""Merge skeleton fragments."""
if output_path is None:
output_path = fragments_path

state['operators'][name] = AggregateSkeletonFragmentsOperator(fragments_path, output_path)
if prefix:
state['operators'][name](prefix)
else:
for task in tasks:
start = time()
state['operators'][name](task[input_name])
task['log']['timer'][name] = time() - start
yield task



@main.command('create-chunk')
@click.option('--name',
type=str,
Expand Down Expand Up @@ -1146,12 +1016,12 @@ def mask(tasks, name, input_chunk_name, output_chunk_name, volume_path,
@click.option('--selected-obj-ids', '-s', type=str, default=None,
help="""a list of segment ids to mesh. This is for sparse meshing.
The ids should be separated by comma without space, such as "34,56,78,90"
it can also be a json file contains a list of ids. The json file should be
put in the output_path.""")
it can also be a json file contains a list of ids. The json file path should
contain protocols, such as "gs://bucket/my/json/file/path.""")
@operator
def mask_out_objects(tasks, name, input_chunk_name, output_chunk_name,
dust_size_threshold, selected_obj_ids):

"""Mask out objects in a segmentation chunk."""
operator = MaskOutObjectsOperator(
dust_size_threshold,
selected_obj_ids,
Expand Down

0 comments on commit 2d46511

Please sign in to comment.