Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# 0.0.2 2024-03-xx

- Merged 1D and 2D encode steps into one, and change rate reporting to bytes
- Add --max-memory for encode
9 changes: 9 additions & 0 deletions bio2zarr/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ def mkschema(if_path):
"schema tuning."
),
)
@click.option(
"-M",
"--max-memory",
type=int,
default=None,
help="An approximate bound on overall memory usage in megabytes",
)
@worker_processes
def encode(
if_path,
Expand All @@ -118,6 +125,7 @@ def encode(
chunk_length,
chunk_width,
max_variant_chunks,
max_memory,
worker_processes,
):
"""
Expand All @@ -132,6 +140,7 @@ def encode(
chunk_width=chunk_width,
max_v_chunks=max_variant_chunks,
worker_processes=worker_processes,
max_memory=max_memory,
show_progress=True,
)

Expand Down
27 changes: 21 additions & 6 deletions bio2zarr/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def flush(self):

def sync_flush_1d_array(np_buffer, zarr_array, offset):
zarr_array[offset : offset + np_buffer.shape[0]] = np_buffer
update_progress(1)
update_progress(np_buffer.nbytes)


def sync_flush_2d_array(np_buffer, zarr_array, offset):
Expand All @@ -127,12 +127,15 @@ def sync_flush_2d_array(np_buffer, zarr_array, offset):
# encoder implementations.
s = slice(offset, offset + np_buffer.shape[0])
chunk_width = zarr_array.chunks[1]
# TODO use zarr chunks here to support non-uniform chunking later
# and for simplicity
zarr_array_width = zarr_array.shape[1]
start = 0
while start < zarr_array_width:
stop = min(start + chunk_width, zarr_array_width)
zarr_array[s, start:stop] = np_buffer[:, start:stop]
update_progress(1)
chunk_buffer = np_buffer[:, start:stop]
zarr_array[s, start:stop] = chunk_buffer
update_progress(chunk_buffer.nbytes)
start = stop


Expand Down Expand Up @@ -177,15 +180,15 @@ def __init__(self, worker_processes=1, progress_config=None):
self.executor = cf.ProcessPoolExecutor(
max_workers=worker_processes,
)
self.futures = []
self.futures = set()

set_progress(0)
if progress_config is None:
progress_config = ProgressConfig()
self.progress_config = progress_config
self.progress_bar = tqdm.tqdm(
total=progress_config.total,
desc=f"{progress_config.title:>9}",
desc=f"{progress_config.title:>7}",
unit_scale=True,
unit=progress_config.units,
smoothing=0.1,
Expand Down Expand Up @@ -216,7 +219,19 @@ def _update_progress_worker(self):
logger.debug("Exit progress thread")

def submit(self, *args, **kwargs):
self.futures.append(self.executor.submit(*args, **kwargs))
future = self.executor.submit(*args, **kwargs)
self.futures.add(future)
return future

def wait_for_completed(self, timeout=None):
done, not_done = cf.wait(self.futures, timeout, cf.FIRST_COMPLETED)
for future in done:
exception = future.exception()
# TODO do the check for BrokenProcessPool here
if exception is not None:
raise exception
self.futures = not_done
return done

def results_as_completed(self):
for future in cf.as_completed(self.futures):
Expand Down
Loading