Skip to content

Commit

Permalink
Merge d74bb70 into b050284
Browse files Browse the repository at this point in the history
  • Loading branch information
jingpengw committed Aug 15, 2019
2 parents b050284 + d74bb70 commit 462cfa8
Showing 1 changed file with 21 additions and 10 deletions.
31 changes: 21 additions & 10 deletions chunkflow/flow/flow.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python
import click
from functools import wraps
from functools import wraps, update_wrapper
from time import time
import numpy as np
from cloudvolume.lib import Bbox
Expand Down Expand Up @@ -97,13 +97,24 @@ def operator(func):
@wraps(func)
def wrapper(*args, **kwargs):
def operator(stream):
if stream:
for item in stream:
return func(item, *args, **kwargs)
return func(stream, *args, **kwargs)
return operator

return wrapper

def generator(func):
"""Similar to the :func:`operator` but passes through old values unchanged and does not pass through the values as parameter.
"""

@operator
def new_func(stream, *args, **kwargs):
for item in stream:
yield item
for item in func(*args, **kwargs):
yield item

return update_wrapper(new_func, func)

@main.command('create-chunk')
@click.option(
'--size',
Expand All @@ -122,7 +133,7 @@ def operator(stream):
nargs=3,
default=(0, 0, 0),
help='offset in voxel number.')
@operator
@generator
def create_chunk(size, dtype, voxel_offset):
"""Create a fake chunk for easy test."""
create_chunk_operator = CreateChunkOperator()
Expand Down Expand Up @@ -150,7 +161,7 @@ def create_chunk(size, dtype, voxel_offset):
type=str,
default='chunk',
help='chunk name in the global state')
@operator
@generator
def read_file(name, file_name, offset, chunk_name):
"""Read tiff files."""
read_tif_operator = ReadTIFOperator()
Expand Down Expand Up @@ -184,7 +195,7 @@ def read_file(name, file_name, offset, chunk_name):
type=str,
default='chunk',
help='chunk name in the global state')
@operator
@generator
def read_h5(name: str, file_name: str, dataset_path: str, offset: tuple, chunk_name: str):
"""Read HDF5 files."""
read_h5_operator = ReadH5Operator()
Expand All @@ -200,7 +211,7 @@ def read_h5(name: str, file_name: str, dataset_path: str, offset: tuple, chunk_n
'--offset', type=int, nargs=3, default=(0, 0, 0), help='output offset')
@click.option(
'--shape', type=int, nargs=3, default=(0, 0, 0), help='output shape')
@operator
@generator
def generate_task(offset, shape):
"""Create a task."""
# no queue name specified
Expand All @@ -221,7 +232,7 @@ def generate_task(offset, shape):
help=
'visibility timeout of sqs queue; default is using the timeout of the queue.'
)
@operator
@generator
def fetch_task(queue_name, visibility_timeout):
"""[generator] Fetch task from queue."""
queue = SQSQueue(queue_name, visibility_timeout=visibility_timeout)
Expand Down Expand Up @@ -286,7 +297,7 @@ def write_tif(tasks, name, file_name):
@operator
def save_pngs(tasks, name, output_path):
"""Save as 2D PNG images."""
state['operators'][name] = SaveImagesOperator(
state['operators'][name] = SavePNGsOperator(
output_path=output_path, name=name)
for task in tasks:
handle_task_skip(task, name)
Expand Down

0 comments on commit 462cfa8

Please sign in to comment.